f2daf0d50e360f050e3146431b1fede07833ddf3
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
11 * for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
27 #include <assert.h>
28 #include "lttng-ctl-helper.h"
29
30 static
31 int handshake(struct lttng_notification_channel *channel);
32
33 /*
34 * Populates the reception buffer with the next complete message.
35 * The caller must acquire the client's lock.
36 */
37 static
38 int receive_message(struct lttng_notification_channel *channel)
39 {
40 ssize_t ret;
41 struct lttng_notification_channel_message msg;
42
43 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
44 if (ret) {
45 goto error;
46 }
47
48 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
49 if (ret <= 0) {
50 ret = -1;
51 goto error;
52 }
53
54 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
55 ret = -1;
56 goto error;
57 }
58
59 /* Add message header at buffer's start. */
60 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
61 sizeof(msg));
62 if (ret) {
63 goto error;
64 }
65
66 /* Reserve space for the payload. */
67 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
68 channel->reception_buffer.size + msg.size);
69 if (ret) {
70 goto error;
71 }
72
73 /* Receive message payload. */
74 ret = lttcomm_recv_unix_sock(channel->socket,
75 channel->reception_buffer.data + sizeof(msg), msg.size);
76 if (ret < (ssize_t) msg.size) {
77 ret = -1;
78 goto error;
79 }
80 ret = 0;
81 end:
82 return ret;
83 error:
84 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
85 ret = -1;
86 }
87 goto end;
88 }
89
90 static
91 enum lttng_notification_channel_message_type get_current_message_type(
92 struct lttng_notification_channel *channel)
93 {
94 struct lttng_notification_channel_message *msg;
95
96 assert(channel->reception_buffer.size >= sizeof(*msg));
97
98 msg = (struct lttng_notification_channel_message *)
99 channel->reception_buffer.data;
100 return (enum lttng_notification_channel_message_type) msg->type;
101 }
102
103 static
104 struct lttng_notification *create_notification_from_current_message(
105 struct lttng_notification_channel *channel)
106 {
107 ssize_t ret;
108 struct lttng_notification *notification = NULL;
109 struct lttng_buffer_view view;
110
111 if (channel->reception_buffer.size <=
112 sizeof(struct lttng_notification_channel_message)) {
113 goto end;
114 }
115
116 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
117 sizeof(struct lttng_notification_channel_message), -1);
118
119 ret = lttng_notification_create_from_buffer(&view, &notification);
120 if (ret != channel->reception_buffer.size -
121 sizeof(struct lttng_notification_channel_message)) {
122 lttng_notification_destroy(notification);
123 notification = NULL;
124 goto end;
125 }
126 end:
127 return notification;
128 }
129
130 struct lttng_notification_channel *lttng_notification_channel_create(
131 struct lttng_endpoint *endpoint)
132 {
133 int fd, ret;
134 bool is_in_tracing_group = false, is_root = false;
135 char *sock_path = NULL;
136 struct lttng_notification_channel *channel = NULL;
137
138 if (!endpoint ||
139 endpoint != lttng_session_daemon_notification_endpoint) {
140 goto end;
141 }
142
143 sock_path = zmalloc(LTTNG_PATH_MAX);
144 if (!sock_path) {
145 goto end;
146 }
147
148 channel = zmalloc(sizeof(struct lttng_notification_channel));
149 if (!channel) {
150 goto end;
151 }
152 channel->socket = -1;
153 pthread_mutex_init(&channel->lock, NULL);
154 lttng_dynamic_buffer_init(&channel->reception_buffer);
155 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
156
157 is_root = (getuid() == 0);
158 if (!is_root) {
159 is_in_tracing_group = lttng_check_tracing_group();
160 }
161
162 if (is_root || is_in_tracing_group) {
163 lttng_ctl_copy_string(sock_path,
164 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
165 LTTNG_PATH_MAX);
166 ret = lttcomm_connect_unix_sock(sock_path);
167 if (ret >= 0) {
168 fd = ret;
169 goto set_fd;
170 }
171 }
172
173 /* Fallback to local session daemon. */
174 ret = snprintf(sock_path, LTTNG_PATH_MAX,
175 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
176 utils_get_home_dir());
177 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
178 goto error;
179 }
180
181 ret = lttcomm_connect_unix_sock(sock_path);
182 if (ret < 0) {
183 goto error;
184 }
185 fd = ret;
186
187 set_fd:
188 channel->socket = fd;
189
190 ret = handshake(channel);
191 if (ret) {
192 goto error;
193 }
194 end:
195 free(sock_path);
196 return channel;
197 error:
198 lttng_notification_channel_destroy(channel);
199 channel = NULL;
200 goto end;
201 }
202
203 enum lttng_notification_channel_status
204 lttng_notification_channel_get_next_notification(
205 struct lttng_notification_channel *channel,
206 struct lttng_notification **_notification)
207 {
208 int ret;
209 struct lttng_notification *notification = NULL;
210 enum lttng_notification_channel_status status =
211 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
212
213 if (!channel || !_notification) {
214 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
215 goto end;
216 }
217
218 pthread_mutex_lock(&channel->lock);
219
220 if (channel->pending_notifications.count) {
221 struct pending_notification *pending_notification;
222
223 assert(!cds_list_empty(&channel->pending_notifications.list));
224
225 /* Deliver one of the pending notifications. */
226 pending_notification = cds_list_first_entry(
227 &channel->pending_notifications.list,
228 struct pending_notification,
229 node);
230 notification = pending_notification->notification;
231 if (!notification) {
232 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
233 }
234 cds_list_del(&pending_notification->node);
235 channel->pending_notifications.count--;
236 free(pending_notification);
237 goto end_unlock;
238 }
239
240 ret = receive_message(channel);
241 if (ret) {
242 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
243 goto end_unlock;
244 }
245
246 switch (get_current_message_type(channel)) {
247 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
248 notification = create_notification_from_current_message(
249 channel);
250 if (!notification) {
251 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
252 goto end_unlock;
253 }
254 break;
255 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
256 /* No payload to consume. */
257 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
258 break;
259 default:
260 /* Protocol error. */
261 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
262 goto end_unlock;
263 }
264
265 end_unlock:
266 pthread_mutex_unlock(&channel->lock);
267 end:
268 if (_notification) {
269 *_notification = notification;
270 }
271 return status;
272 }
273
274 static
275 int enqueue_dropped_notification(
276 struct lttng_notification_channel *channel)
277 {
278 int ret = 0;
279 struct pending_notification *pending_notification;
280 struct cds_list_head *last_element =
281 channel->pending_notifications.list.prev;
282
283 pending_notification = caa_container_of(last_element,
284 struct pending_notification, node);
285 if (!pending_notification->notification) {
286 /*
287 * The last enqueued notification indicates dropped
288 * notifications; there is nothing to do as we group
289 * dropped notifications together.
290 */
291 goto end;
292 }
293
294 if (channel->pending_notifications.count >=
295 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
296 pending_notification->notification) {
297 /*
298 * Discard the last enqueued notification to indicate
299 * that notifications were dropped at this point.
300 */
301 lttng_notification_destroy(
302 pending_notification->notification);
303 pending_notification->notification = NULL;
304 goto end;
305 }
306
307 pending_notification = zmalloc(sizeof(*pending_notification));
308 if (!pending_notification) {
309 ret = -1;
310 goto end;
311 }
312 CDS_INIT_LIST_HEAD(&pending_notification->node);
313 cds_list_add(&pending_notification->node,
314 &channel->pending_notifications.list);
315 channel->pending_notifications.count++;
316 end:
317 return ret;
318 }
319
320 static
321 int enqueue_notification_from_current_message(
322 struct lttng_notification_channel *channel)
323 {
324 int ret = 0;
325 struct lttng_notification *notification;
326 struct pending_notification *pending_notification;
327
328 if (channel->pending_notifications.count >=
329 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
330 /* Drop the notification. */
331 ret = enqueue_dropped_notification(channel);
332 goto end;
333 }
334
335 pending_notification = zmalloc(sizeof(*pending_notification));
336 if (!pending_notification) {
337 ret = -1;
338 goto error;
339 }
340 CDS_INIT_LIST_HEAD(&pending_notification->node);
341
342 notification = create_notification_from_current_message(channel);
343 if (!notification) {
344 ret = -1;
345 goto error;
346 }
347
348 pending_notification->notification = notification;
349 cds_list_add(&pending_notification->node,
350 &channel->pending_notifications.list);
351 channel->pending_notifications.count++;
352 end:
353 return ret;
354 error:
355 free(pending_notification);
356 goto end;
357 }
358
359 static
360 int receive_command_reply(struct lttng_notification_channel *channel,
361 enum lttng_notification_channel_status *status)
362 {
363 int ret;
364 struct lttng_notification_channel_command_reply *reply;
365
366 while (true) {
367 enum lttng_notification_channel_message_type msg_type;
368
369 ret = receive_message(channel);
370 if (ret) {
371 goto end;
372 }
373
374 msg_type = get_current_message_type(channel);
375 switch (msg_type) {
376 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
377 goto exit_loop;
378 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
379 ret = enqueue_notification_from_current_message(
380 channel);
381 if (ret) {
382 goto end;
383 }
384 break;
385 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
386 ret = enqueue_dropped_notification(channel);
387 if (ret) {
388 goto end;
389 }
390 break;
391 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
392 {
393 struct lttng_notification_channel_command_handshake *handshake;
394
395 handshake = (struct lttng_notification_channel_command_handshake *)
396 (channel->reception_buffer.data +
397 sizeof(struct lttng_notification_channel_message));
398 channel->version.major = handshake->major;
399 channel->version.minor = handshake->minor;
400 channel->version.set = true;
401 break;
402 }
403 default:
404 ret = -1;
405 goto end;
406 }
407 }
408
409 exit_loop:
410 if (channel->reception_buffer.size <
411 (sizeof(struct lttng_notification_channel_message) +
412 sizeof(*reply))) {
413 /* Invalid message received. */
414 ret = -1;
415 goto end;
416 }
417
418 reply = (struct lttng_notification_channel_command_reply *)
419 (channel->reception_buffer.data +
420 sizeof(struct lttng_notification_channel_message));
421 *status = (enum lttng_notification_channel_status) reply->status;
422 end:
423 return ret;
424 }
425
426 static
427 int handshake(struct lttng_notification_channel *channel)
428 {
429 ssize_t ret;
430 enum lttng_notification_channel_status status =
431 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
432 struct lttng_notification_channel_command_handshake handshake = {
433 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
434 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
435 };
436 struct lttng_notification_channel_message msg_header = {
437 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
438 .size = sizeof(handshake),
439 };
440 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
441
442 memcpy(send_buffer, &msg_header, sizeof(msg_header));
443 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
444
445 pthread_mutex_lock(&channel->lock);
446
447 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
448 sizeof(send_buffer));
449 if (ret < 0) {
450 goto end_unlock;
451 }
452
453 /* Receive handshake info from the sessiond. */
454 ret = receive_command_reply(channel, &status);
455 if (ret < 0) {
456 goto end_unlock;
457 }
458
459 if (!channel->version.set) {
460 ret = -1;
461 goto end_unlock;
462 }
463
464 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
465 ret = -1;
466 goto end_unlock;
467 }
468
469 end_unlock:
470 pthread_mutex_unlock(&channel->lock);
471 return ret;
472 }
473
474 static
475 enum lttng_notification_channel_status send_condition_command(
476 struct lttng_notification_channel *channel,
477 enum lttng_notification_channel_message_type type,
478 const struct lttng_condition *condition)
479 {
480 int socket;
481 ssize_t command_size, ret;
482 enum lttng_notification_channel_status status =
483 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
484 char *command_buffer = NULL;
485 struct lttng_notification_channel_message cmd_message = {
486 .type = type,
487 };
488
489 if (!channel) {
490 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
491 goto end;
492 }
493
494 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
495 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
496
497 pthread_mutex_lock(&channel->lock);
498 socket = channel->socket;
499 if (!lttng_condition_validate(condition)) {
500 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
501 goto end_unlock;
502 }
503
504 ret = lttng_condition_serialize(condition, NULL);
505 if (ret < 0) {
506 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
507 goto end_unlock;
508 }
509 assert(ret < UINT32_MAX);
510 cmd_message.size = (uint32_t) ret;
511 command_size = ret + sizeof(
512 struct lttng_notification_channel_message);
513 command_buffer = zmalloc(command_size);
514 if (!command_buffer) {
515 goto end_unlock;
516 }
517
518 memcpy(command_buffer, &cmd_message, sizeof(cmd_message));
519 ret = lttng_condition_serialize(condition,
520 command_buffer + sizeof(cmd_message));
521 if (ret < 0) {
522 goto end_unlock;
523 }
524
525 ret = lttcomm_send_unix_sock(socket, command_buffer, command_size);
526 if (ret < 0) {
527 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
528 goto end_unlock;
529 }
530
531 ret = receive_command_reply(channel, &status);
532 if (ret < 0) {
533 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
534 goto end_unlock;
535 }
536 end_unlock:
537 pthread_mutex_unlock(&channel->lock);
538 end:
539 free(command_buffer);
540 return status;
541 }
542
543 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
544 struct lttng_notification_channel *channel,
545 const struct lttng_condition *condition)
546 {
547 return send_condition_command(channel,
548 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
549 condition);
550 }
551
552 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
553 struct lttng_notification_channel *channel,
554 const struct lttng_condition *condition)
555 {
556 return send_condition_command(channel,
557 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
558 condition);
559 }
560
561 void lttng_notification_channel_destroy(
562 struct lttng_notification_channel *channel)
563 {
564 if (!channel) {
565 return;
566 }
567
568 if (channel->socket >= 0) {
569 (void) lttcomm_close_unix_sock(channel->socket);
570 }
571 pthread_mutex_destroy(&channel->lock);
572 lttng_dynamic_buffer_reset(&channel->reception_buffer);
573 free(channel);
574 }
This page took 0.040839 seconds and 4 git commands to generate.