Use consumer fd reference in consumer socket obj
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include <inttypes.h>
27
28 #include <common/common.h>
29 #include <common/defaults.h>
30 #include <common/uri.h>
31
32 #include "consumer.h"
33 #include "health.h"
34 #include "ust-app.h"
35 #include "utils.h"
36
37 /*
38 * Receive a reply command status message from the consumer. Consumer socket
39 * lock MUST be acquired before calling this function.
40 *
41 * Return 0 on success, -1 on recv error or a negative lttng error code which
42 * was possibly returned by the consumer.
43 */
44 int consumer_recv_status_reply(struct consumer_socket *sock)
45 {
46 int ret;
47 struct lttcomm_consumer_status_msg reply;
48
49 assert(sock);
50
51 ret = lttcomm_recv_unix_sock(*sock->fd, &reply, sizeof(reply));
52 if (ret <= 0) {
53 if (ret == 0) {
54 /* Orderly shutdown. Don't return 0 which means success. */
55 ret = -1;
56 }
57 /* The above call will print a PERROR on error. */
58 DBG("Fail to receive status reply on sock %d", *sock->fd);
59 goto end;
60 }
61
62 if (reply.ret_code == LTTNG_OK) {
63 /* All good. */
64 ret = 0;
65 } else {
66 ret = -reply.ret_code;
67 DBG("Consumer ret code %d", ret);
68 }
69
70 end:
71 return ret;
72 }
73
74 /*
75 * Once the ASK_CHANNEL command is sent to the consumer, the channel
76 * information are sent back. This call receives that data and populates key
77 * and stream_count.
78 *
79 * On success return 0 and both key and stream_count are set. On error, a
80 * negative value is sent back and both parameters are untouched.
81 */
82 int consumer_recv_status_channel(struct consumer_socket *sock,
83 uint64_t *key, unsigned int *stream_count)
84 {
85 int ret;
86 struct lttcomm_consumer_status_channel reply;
87
88 assert(sock);
89 assert(stream_count);
90 assert(key);
91
92 ret = lttcomm_recv_unix_sock(*sock->fd, &reply, sizeof(reply));
93 if (ret <= 0) {
94 if (ret == 0) {
95 /* Orderly shutdown. Don't return 0 which means success. */
96 ret = -1;
97 }
98 /* The above call will print a PERROR on error. */
99 DBG("Fail to receive status reply on sock %d", *sock->fd);
100 goto end;
101 }
102
103 /* An error is possible so don't touch the key and stream_count. */
104 if (reply.ret_code != LTTNG_OK) {
105 ret = -1;
106 goto end;
107 }
108
109 *key = reply.key;
110 *stream_count = reply.stream_count;
111
112 end:
113 return ret;
114 }
115
116 /*
117 * Send destroy relayd command to consumer.
118 *
119 * On success return positive value. On error, negative value.
120 */
121 int consumer_send_destroy_relayd(struct consumer_socket *sock,
122 struct consumer_output *consumer)
123 {
124 int ret;
125 struct lttcomm_consumer_msg msg;
126
127 assert(consumer);
128 assert(sock);
129
130 DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd);
131
132 /* Bail out if consumer is disabled */
133 if (!consumer->enabled) {
134 ret = LTTNG_OK;
135 DBG3("Consumer is disabled");
136 goto error;
137 }
138
139 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
140 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
141
142 pthread_mutex_lock(sock->lock);
143 ret = lttcomm_send_unix_sock(*sock->fd, &msg, sizeof(msg));
144 if (ret < 0) {
145 /* Indicate that the consumer is probably closing at this point. */
146 DBG("send consumer destroy relayd command");
147 goto error_send;
148 }
149
150 /* Don't check the return value. The caller will do it. */
151 ret = consumer_recv_status_reply(sock);
152
153 DBG2("Consumer send destroy relayd command done");
154
155 error_send:
156 pthread_mutex_unlock(sock->lock);
157 error:
158 return ret;
159 }
160
161 /*
162 * For each consumer socket in the consumer output object, send a destroy
163 * relayd command.
164 */
165 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
166 {
167 struct lttng_ht_iter iter;
168 struct consumer_socket *socket;
169
170 assert(consumer);
171
172 /* Destroy any relayd connection */
173 if (consumer->type == CONSUMER_DST_NET) {
174 rcu_read_lock();
175 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
176 node.node) {
177 int ret;
178
179 /* Send destroy relayd command */
180 ret = consumer_send_destroy_relayd(socket, consumer);
181 if (ret < 0) {
182 DBG("Unable to send destroy relayd command to consumer");
183 /* Continue since we MUST delete everything at this point. */
184 }
185 }
186 rcu_read_unlock();
187 }
188 }
189
190 /*
191 * From a consumer_data structure, allocate and add a consumer socket to the
192 * consumer output.
193 *
194 * Return 0 on success, else negative value on error
195 */
196 int consumer_create_socket(struct consumer_data *data,
197 struct consumer_output *output)
198 {
199 int ret = 0;
200 struct consumer_socket *socket;
201
202 assert(data);
203
204 if (output == NULL || data->cmd_sock < 0) {
205 /*
206 * Not an error. Possible there is simply not spawned consumer or it's
207 * disabled for the tracing session asking the socket.
208 */
209 goto error;
210 }
211
212 rcu_read_lock();
213 socket = consumer_find_socket(data->cmd_sock, output);
214 rcu_read_unlock();
215 if (socket == NULL) {
216 socket = consumer_allocate_socket(&data->cmd_sock);
217 if (socket == NULL) {
218 ret = -1;
219 goto error;
220 }
221
222 socket->registered = 0;
223 socket->lock = &data->lock;
224 rcu_read_lock();
225 consumer_add_socket(socket, output);
226 rcu_read_unlock();
227 }
228
229 socket->type = data->type;
230
231 DBG3("Consumer socket created (fd: %d) and added to output",
232 data->cmd_sock);
233
234 error:
235 return ret;
236 }
237
238 /*
239 * Return the consumer socket from the given consumer output with the right
240 * bitness. On error, returns NULL.
241 *
242 * The caller MUST acquire a rcu read side lock and keep it until the socket
243 * object reference is not needed anymore.
244 */
245 struct consumer_socket *consumer_find_socket_by_bitness(int bits,
246 struct consumer_output *consumer)
247 {
248 int consumer_fd;
249 struct consumer_socket *socket = NULL;
250
251 switch (bits) {
252 case 64:
253 consumer_fd = uatomic_read(&ust_consumerd64_fd);
254 break;
255 case 32:
256 consumer_fd = uatomic_read(&ust_consumerd32_fd);
257 break;
258 default:
259 assert(0);
260 goto end;
261 }
262
263 socket = consumer_find_socket(consumer_fd, consumer);
264 if (!socket) {
265 ERR("Consumer socket fd %d not found in consumer obj %p",
266 consumer_fd, consumer);
267 }
268
269 end:
270 return socket;
271 }
272
273 /*
274 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
275 * be acquired before calling this function and across use of the
276 * returned consumer_socket.
277 */
278 struct consumer_socket *consumer_find_socket(int key,
279 struct consumer_output *consumer)
280 {
281 struct lttng_ht_iter iter;
282 struct lttng_ht_node_ulong *node;
283 struct consumer_socket *socket = NULL;
284
285 /* Negative keys are lookup failures */
286 if (key < 0 || consumer == NULL) {
287 return NULL;
288 }
289
290 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
291 &iter);
292 node = lttng_ht_iter_get_node_ulong(&iter);
293 if (node != NULL) {
294 socket = caa_container_of(node, struct consumer_socket, node);
295 }
296
297 return socket;
298 }
299
300 /*
301 * Allocate a new consumer_socket and return the pointer.
302 */
303 struct consumer_socket *consumer_allocate_socket(int *fd)
304 {
305 struct consumer_socket *socket = NULL;
306
307 assert(fd);
308
309 socket = zmalloc(sizeof(struct consumer_socket));
310 if (socket == NULL) {
311 PERROR("zmalloc consumer socket");
312 goto error;
313 }
314
315 socket->fd = fd;
316 lttng_ht_node_init_ulong(&socket->node, *fd);
317
318 error:
319 return socket;
320 }
321
322 /*
323 * Add consumer socket to consumer output object. Read side lock must be
324 * acquired before calling this function.
325 */
326 void consumer_add_socket(struct consumer_socket *sock,
327 struct consumer_output *consumer)
328 {
329 assert(sock);
330 assert(consumer);
331
332 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
333 }
334
335 /*
336 * Delte consumer socket to consumer output object. Read side lock must be
337 * acquired before calling this function.
338 */
339 void consumer_del_socket(struct consumer_socket *sock,
340 struct consumer_output *consumer)
341 {
342 int ret;
343 struct lttng_ht_iter iter;
344
345 assert(sock);
346 assert(consumer);
347
348 iter.iter.node = &sock->node.node;
349 ret = lttng_ht_del(consumer->socks, &iter);
350 assert(!ret);
351 }
352
353 /*
354 * RCU destroy call function.
355 */
356 static void destroy_socket_rcu(struct rcu_head *head)
357 {
358 struct lttng_ht_node_ulong *node =
359 caa_container_of(head, struct lttng_ht_node_ulong, head);
360 struct consumer_socket *socket =
361 caa_container_of(node, struct consumer_socket, node);
362
363 free(socket);
364 }
365
366 /*
367 * Destroy and free socket pointer in a call RCU. Read side lock must be
368 * acquired before calling this function.
369 */
370 void consumer_destroy_socket(struct consumer_socket *sock)
371 {
372 assert(sock);
373
374 /*
375 * We DO NOT close the file descriptor here since it is global to the
376 * session daemon and is closed only if the consumer dies or a custom
377 * consumer was registered,
378 */
379 if (sock->registered) {
380 DBG3("Consumer socket was registered. Closing fd %d", *sock->fd);
381 lttcomm_close_unix_sock(*sock->fd);
382 }
383
384 call_rcu(&sock->node.head, destroy_socket_rcu);
385 }
386
387 /*
388 * Allocate and assign data to a consumer_output object.
389 *
390 * Return pointer to structure.
391 */
392 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
393 {
394 struct consumer_output *output = NULL;
395
396 output = zmalloc(sizeof(struct consumer_output));
397 if (output == NULL) {
398 PERROR("zmalloc consumer_output");
399 goto error;
400 }
401
402 /* By default, consumer output is enabled */
403 output->enabled = 1;
404 output->type = type;
405 output->net_seq_index = (uint64_t) -1ULL;
406
407 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
408
409 error:
410 return output;
411 }
412
413 /*
414 * Iterate over the consumer output socket hash table and destroy them. The
415 * socket file descriptor are only closed if the consumer output was
416 * registered meaning it's an external consumer.
417 */
418 void consumer_destroy_output_sockets(struct consumer_output *obj)
419 {
420 struct lttng_ht_iter iter;
421 struct consumer_socket *socket;
422
423 if (!obj->socks) {
424 return;
425 }
426
427 rcu_read_lock();
428 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
429 consumer_del_socket(socket, obj);
430 consumer_destroy_socket(socket);
431 }
432 rcu_read_unlock();
433 }
434
435 /*
436 * Delete the consumer_output object from the list and free the ptr.
437 *
438 * Should *NOT* be called with RCU read-side lock held.
439 */
440 void consumer_destroy_output(struct consumer_output *obj)
441 {
442 if (obj == NULL) {
443 return;
444 }
445
446 consumer_destroy_output_sockets(obj);
447
448 if (obj->socks) {
449 /* Finally destroy HT */
450 ht_cleanup_push(obj->socks);
451 }
452
453 free(obj);
454 }
455
456 /*
457 * Copy consumer output and returned the newly allocated copy.
458 *
459 * Should *NOT* be called with RCU read-side lock held.
460 */
461 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
462 {
463 int ret;
464 struct lttng_ht *tmp_ht_ptr;
465 struct consumer_output *output;
466
467 assert(obj);
468
469 output = consumer_create_output(obj->type);
470 if (output == NULL) {
471 goto error;
472 }
473 /* Avoid losing the HT reference after the memcpy() */
474 tmp_ht_ptr = output->socks;
475
476 memcpy(output, obj, sizeof(struct consumer_output));
477
478 /* Putting back the HT pointer and start copying socket(s). */
479 output->socks = tmp_ht_ptr;
480
481 ret = consumer_copy_sockets(output, obj);
482 if (ret < 0) {
483 goto malloc_error;
484 }
485
486 error:
487 return output;
488
489 malloc_error:
490 consumer_destroy_output(output);
491 return NULL;
492 }
493
494 /*
495 * Copy consumer sockets from src to dst.
496 *
497 * Return 0 on success or else a negative value.
498 */
499 int consumer_copy_sockets(struct consumer_output *dst,
500 struct consumer_output *src)
501 {
502 int ret = 0;
503 struct lttng_ht_iter iter;
504 struct consumer_socket *socket, *copy_sock;
505
506 assert(dst);
507 assert(src);
508
509 rcu_read_lock();
510 cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) {
511 /* Ignore socket that are already there. */
512 copy_sock = consumer_find_socket(*socket->fd, dst);
513 if (copy_sock) {
514 continue;
515 }
516
517 /* Create new socket object. */
518 copy_sock = consumer_allocate_socket(socket->fd);
519 if (copy_sock == NULL) {
520 rcu_read_unlock();
521 ret = -ENOMEM;
522 goto error;
523 }
524
525 copy_sock->registered = socket->registered;
526 /*
527 * This is valid because this lock is shared accross all consumer
528 * object being the global lock of the consumer data structure of the
529 * session daemon.
530 */
531 copy_sock->lock = socket->lock;
532 consumer_add_socket(copy_sock, dst);
533 }
534 rcu_read_unlock();
535
536 error:
537 return ret;
538 }
539
540 /*
541 * Set network URI to the consumer output object.
542 *
543 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
544 * error.
545 */
546 int consumer_set_network_uri(struct consumer_output *obj,
547 struct lttng_uri *uri)
548 {
549 int ret;
550 char tmp_path[PATH_MAX];
551 char hostname[HOST_NAME_MAX];
552 struct lttng_uri *dst_uri = NULL;
553
554 /* Code flow error safety net. */
555 assert(obj);
556 assert(uri);
557
558 switch (uri->stype) {
559 case LTTNG_STREAM_CONTROL:
560 dst_uri = &obj->dst.net.control;
561 obj->dst.net.control_isset = 1;
562 if (uri->port == 0) {
563 /* Assign default port. */
564 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
565 } else {
566 if (obj->dst.net.data_isset && uri->port ==
567 obj->dst.net.data.port) {
568 ret = -LTTNG_ERR_INVALID;
569 goto error;
570 }
571 }
572 DBG3("Consumer control URI set with port %d", uri->port);
573 break;
574 case LTTNG_STREAM_DATA:
575 dst_uri = &obj->dst.net.data;
576 obj->dst.net.data_isset = 1;
577 if (uri->port == 0) {
578 /* Assign default port. */
579 uri->port = DEFAULT_NETWORK_DATA_PORT;
580 } else {
581 if (obj->dst.net.control_isset && uri->port ==
582 obj->dst.net.control.port) {
583 ret = -LTTNG_ERR_INVALID;
584 goto error;
585 }
586 }
587 DBG3("Consumer data URI set with port %d", uri->port);
588 break;
589 default:
590 ERR("Set network uri type unknown %d", uri->stype);
591 ret = -LTTNG_ERR_INVALID;
592 goto error;
593 }
594
595 ret = uri_compare(dst_uri, uri);
596 if (!ret) {
597 /* Same URI, don't touch it and return success. */
598 DBG3("URI network compare are the same");
599 goto equal;
600 }
601
602 /* URIs were not equal, replacing it. */
603 memset(dst_uri, 0, sizeof(struct lttng_uri));
604 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
605 obj->type = CONSUMER_DST_NET;
606
607 /* Handle subdir and add hostname in front. */
608 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
609 /* Get hostname to append it in the pathname */
610 ret = gethostname(hostname, sizeof(hostname));
611 if (ret < 0) {
612 PERROR("gethostname. Fallback on default localhost");
613 strncpy(hostname, "localhost", sizeof(hostname));
614 }
615 hostname[sizeof(hostname) - 1] = '\0';
616
617 /* Setup consumer subdir if none present in the control URI */
618 if (strlen(dst_uri->subdir) == 0) {
619 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
620 hostname, obj->subdir);
621 } else {
622 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
623 hostname, dst_uri->subdir);
624 }
625 if (ret < 0) {
626 PERROR("snprintf set consumer uri subdir");
627 ret = -LTTNG_ERR_NOMEM;
628 goto error;
629 }
630
631 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
632 DBG3("Consumer set network uri subdir path %s", tmp_path);
633 }
634
635 return 0;
636 equal:
637 return 1;
638 error:
639 return ret;
640 }
641
642 /*
643 * Send file descriptor to consumer via sock.
644 */
645 int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
646 {
647 int ret;
648
649 assert(fds);
650 assert(sock);
651 assert(sock->fd);
652 assert(nb_fd > 0);
653
654 ret = lttcomm_send_fds_unix_sock(*sock->fd, fds, nb_fd);
655 if (ret < 0) {
656 /* The above call will print a PERROR on error. */
657 DBG("Error when sending consumer fds on sock %d", *sock->fd);
658 goto error;
659 }
660
661 ret = consumer_recv_status_reply(sock);
662
663 error:
664 return ret;
665 }
666
667 /*
668 * Consumer send communication message structure to consumer.
669 */
670 int consumer_send_msg(struct consumer_socket *sock,
671 struct lttcomm_consumer_msg *msg)
672 {
673 int ret;
674
675 assert(msg);
676 assert(sock);
677 assert(sock->fd);
678
679 ret = lttcomm_send_unix_sock(*sock->fd, msg,
680 sizeof(struct lttcomm_consumer_msg));
681 if (ret < 0) {
682 /* The above call will print a PERROR on error. */
683 DBG("Error when sending consumer channel on sock %d", *sock->fd);
684 goto error;
685 }
686
687 ret = consumer_recv_status_reply(sock);
688
689 error:
690 return ret;
691 }
692
693 /*
694 * Consumer send channel communication message structure to consumer.
695 */
696 int consumer_send_channel(struct consumer_socket *sock,
697 struct lttcomm_consumer_msg *msg)
698 {
699 int ret;
700
701 assert(msg);
702 assert(sock);
703 assert(sock->fd);
704
705 ret = lttcomm_send_unix_sock(*sock->fd, msg,
706 sizeof(struct lttcomm_consumer_msg));
707 if (ret < 0) {
708 /* The above call will print a PERROR on error. */
709 DBG("Error when sending consumer channel on sock %d", *sock->fd);
710 goto error;
711 }
712
713 ret = consumer_recv_status_reply(sock);
714
715 error:
716 return ret;
717 }
718
719 /*
720 * Populate the given consumer msg structure with the ask_channel command
721 * information.
722 */
723 void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
724 uint64_t subbuf_size,
725 uint64_t num_subbuf,
726 int overwrite,
727 unsigned int switch_timer_interval,
728 unsigned int read_timer_interval,
729 int output,
730 int type,
731 uint64_t session_id,
732 const char *pathname,
733 const char *name,
734 uid_t uid,
735 gid_t gid,
736 uint64_t relayd_id,
737 uint64_t key,
738 unsigned char *uuid,
739 uint32_t chan_id,
740 uint64_t tracefile_size,
741 uint64_t tracefile_count,
742 uint64_t session_id_per_pid,
743 unsigned int monitor,
744 uint32_t ust_app_uid)
745 {
746 assert(msg);
747
748 /* Zeroed structure */
749 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
750
751 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
752 msg->u.ask_channel.subbuf_size = subbuf_size;
753 msg->u.ask_channel.num_subbuf = num_subbuf ;
754 msg->u.ask_channel.overwrite = overwrite;
755 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
756 msg->u.ask_channel.read_timer_interval = read_timer_interval;
757 msg->u.ask_channel.output = output;
758 msg->u.ask_channel.type = type;
759 msg->u.ask_channel.session_id = session_id;
760 msg->u.ask_channel.session_id_per_pid = session_id_per_pid;
761 msg->u.ask_channel.uid = uid;
762 msg->u.ask_channel.gid = gid;
763 msg->u.ask_channel.relayd_id = relayd_id;
764 msg->u.ask_channel.key = key;
765 msg->u.ask_channel.chan_id = chan_id;
766 msg->u.ask_channel.tracefile_size = tracefile_size;
767 msg->u.ask_channel.tracefile_count = tracefile_count;
768 msg->u.ask_channel.monitor = monitor;
769 msg->u.ask_channel.ust_app_uid = ust_app_uid;
770
771 memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
772
773 if (pathname) {
774 strncpy(msg->u.ask_channel.pathname, pathname,
775 sizeof(msg->u.ask_channel.pathname));
776 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0';
777 }
778
779 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
780 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
781 }
782
783 /*
784 * Init channel communication message structure.
785 */
786 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
787 enum lttng_consumer_command cmd,
788 uint64_t channel_key,
789 uint64_t session_id,
790 const char *pathname,
791 uid_t uid,
792 gid_t gid,
793 uint64_t relayd_id,
794 const char *name,
795 unsigned int nb_init_streams,
796 enum lttng_event_output output,
797 int type,
798 uint64_t tracefile_size,
799 uint64_t tracefile_count,
800 unsigned int monitor)
801 {
802 assert(msg);
803
804 /* Zeroed structure */
805 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
806
807 /* Send channel */
808 msg->cmd_type = cmd;
809 msg->u.channel.channel_key = channel_key;
810 msg->u.channel.session_id = session_id;
811 msg->u.channel.uid = uid;
812 msg->u.channel.gid = gid;
813 msg->u.channel.relayd_id = relayd_id;
814 msg->u.channel.nb_init_streams = nb_init_streams;
815 msg->u.channel.output = output;
816 msg->u.channel.type = type;
817 msg->u.channel.tracefile_size = tracefile_size;
818 msg->u.channel.tracefile_count = tracefile_count;
819 msg->u.channel.monitor = monitor;
820
821 strncpy(msg->u.channel.pathname, pathname,
822 sizeof(msg->u.channel.pathname));
823 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
824
825 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
826 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
827 }
828
829 /*
830 * Init stream communication message structure.
831 */
832 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
833 enum lttng_consumer_command cmd,
834 uint64_t channel_key,
835 uint64_t stream_key,
836 int cpu)
837 {
838 assert(msg);
839
840 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
841
842 msg->cmd_type = cmd;
843 msg->u.stream.channel_key = channel_key;
844 msg->u.stream.stream_key = stream_key;
845 msg->u.stream.cpu = cpu;
846 }
847
848 /*
849 * Send stream communication structure to the consumer.
850 */
851 int consumer_send_stream(struct consumer_socket *sock,
852 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
853 int *fds, size_t nb_fd)
854 {
855 int ret;
856
857 assert(msg);
858 assert(dst);
859 assert(sock);
860 assert(sock->fd);
861 assert(fds);
862
863 /* Send on socket */
864 ret = lttcomm_send_unix_sock(*sock->fd, msg,
865 sizeof(struct lttcomm_consumer_msg));
866 if (ret < 0) {
867 /* The above call will print a PERROR on error. */
868 DBG("Error when sending consumer stream on sock %d", *sock->fd);
869 goto error;
870 }
871
872 ret = consumer_recv_status_reply(sock);
873 if (ret < 0) {
874 goto error;
875 }
876
877 ret = consumer_send_fds(sock, fds, nb_fd);
878 if (ret < 0) {
879 goto error;
880 }
881
882 error:
883 return ret;
884 }
885
886 /*
887 * Send relayd socket to consumer associated with a session name.
888 *
889 * On success return positive value. On error, negative value.
890 */
891 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
892 struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
893 enum lttng_stream_type type, uint64_t session_id)
894 {
895 int ret;
896 struct lttcomm_consumer_msg msg;
897
898 /* Code flow error. Safety net. */
899 assert(rsock);
900 assert(consumer);
901 assert(consumer_sock);
902 assert(consumer_sock->fd);
903
904 /* Bail out if consumer is disabled */
905 if (!consumer->enabled) {
906 ret = LTTNG_OK;
907 goto error;
908 }
909
910 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
911 /*
912 * Assign network consumer output index using the temporary consumer since
913 * this call should only be made from within a set_consumer_uri() function
914 * call in the session daemon.
915 */
916 msg.u.relayd_sock.net_index = consumer->net_seq_index;
917 msg.u.relayd_sock.type = type;
918 msg.u.relayd_sock.session_id = session_id;
919 memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
920
921 DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd);
922 ret = lttcomm_send_unix_sock(*consumer_sock->fd, &msg, sizeof(msg));
923 if (ret < 0) {
924 /* The above call will print a PERROR on error. */
925 DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd);
926 goto error;
927 }
928
929 ret = consumer_recv_status_reply(consumer_sock);
930 if (ret < 0) {
931 goto error;
932 }
933
934 DBG3("Sending relayd socket file descriptor to consumer");
935 ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1);
936 if (ret < 0) {
937 goto error;
938 }
939
940 DBG2("Consumer relayd socket sent");
941
942 error:
943 return ret;
944 }
945
946 /*
947 * Set consumer subdirectory using the session name and a generated datetime if
948 * needed. This is appended to the current subdirectory.
949 */
950 int consumer_set_subdir(struct consumer_output *consumer,
951 const char *session_name)
952 {
953 int ret = 0;
954 unsigned int have_default_name = 0;
955 char datetime[16], tmp_path[PATH_MAX];
956 time_t rawtime;
957 struct tm *timeinfo;
958
959 assert(consumer);
960 assert(session_name);
961
962 memset(tmp_path, 0, sizeof(tmp_path));
963
964 /* Flag if we have a default session. */
965 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
966 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
967 have_default_name = 1;
968 } else {
969 /* Get date and time for session path */
970 time(&rawtime);
971 timeinfo = localtime(&rawtime);
972 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
973 }
974
975 if (have_default_name) {
976 ret = snprintf(tmp_path, sizeof(tmp_path),
977 "%s/%s", consumer->subdir, session_name);
978 } else {
979 ret = snprintf(tmp_path, sizeof(tmp_path),
980 "%s/%s-%s/", consumer->subdir, session_name, datetime);
981 }
982 if (ret < 0) {
983 PERROR("snprintf session name date");
984 goto error;
985 }
986
987 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
988 DBG2("Consumer subdir set to %s", consumer->subdir);
989
990 error:
991 return ret;
992 }
993
994 /*
995 * Ask the consumer if the data is ready to read (NOT pending) for the specific
996 * session id.
997 *
998 * This function has a different behavior with the consumer i.e. that it waits
999 * for a reply from the consumer if yes or no the data is pending.
1000 */
1001 int consumer_is_data_pending(uint64_t session_id,
1002 struct consumer_output *consumer)
1003 {
1004 int ret;
1005 int32_t ret_code = 0; /* Default is that the data is NOT pending */
1006 struct consumer_socket *socket;
1007 struct lttng_ht_iter iter;
1008 struct lttcomm_consumer_msg msg;
1009
1010 assert(consumer);
1011
1012 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
1013
1014 msg.u.data_pending.session_id = session_id;
1015
1016 DBG3("Consumer data pending for id %" PRIu64, session_id);
1017
1018 /* Send command for each consumer */
1019 rcu_read_lock();
1020 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
1021 node.node) {
1022 /* Code flow error */
1023 assert(socket->fd);
1024
1025 pthread_mutex_lock(socket->lock);
1026
1027 ret = lttcomm_send_unix_sock(*socket->fd, &msg, sizeof(msg));
1028 if (ret < 0) {
1029 /* The above call will print a PERROR on error. */
1030 DBG("Error on consumer is data pending on sock %d", *socket->fd);
1031 pthread_mutex_unlock(socket->lock);
1032 goto error_unlock;
1033 }
1034
1035 /*
1036 * No need for a recv reply status because the answer to the command is
1037 * the reply status message.
1038 */
1039
1040 ret = lttcomm_recv_unix_sock(*socket->fd, &ret_code, sizeof(ret_code));
1041 if (ret <= 0) {
1042 if (ret == 0) {
1043 /* Orderly shutdown. Don't return 0 which means success. */
1044 ret = -1;
1045 }
1046 /* The above call will print a PERROR on error. */
1047 DBG("Error on recv consumer is data pending on sock %d", *socket->fd);
1048 pthread_mutex_unlock(socket->lock);
1049 goto error_unlock;
1050 }
1051
1052 pthread_mutex_unlock(socket->lock);
1053
1054 if (ret_code == 1) {
1055 break;
1056 }
1057 }
1058 rcu_read_unlock();
1059
1060 DBG("Consumer data is %s pending for session id %" PRIu64,
1061 ret_code == 1 ? "" : "NOT", session_id);
1062 return ret_code;
1063
1064 error_unlock:
1065 rcu_read_unlock();
1066 return -1;
1067 }
1068
1069 /*
1070 * Send a flush command to consumer using the given channel key.
1071 *
1072 * Return 0 on success else a negative value.
1073 */
1074 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
1075 {
1076 int ret;
1077 struct lttcomm_consumer_msg msg;
1078
1079 assert(socket);
1080 assert(socket->fd);
1081
1082 DBG2("Consumer flush channel key %" PRIu64, key);
1083
1084 msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
1085 msg.u.flush_channel.key = key;
1086
1087 pthread_mutex_lock(socket->lock);
1088 health_code_update();
1089
1090 ret = consumer_send_msg(socket, &msg);
1091 if (ret < 0) {
1092 goto end;
1093 }
1094
1095 end:
1096 health_code_update();
1097 pthread_mutex_unlock(socket->lock);
1098 return ret;
1099 }
1100
1101 /*
1102 * Send a close metdata command to consumer using the given channel key.
1103 *
1104 * Return 0 on success else a negative value.
1105 */
1106 int consumer_close_metadata(struct consumer_socket *socket,
1107 uint64_t metadata_key)
1108 {
1109 int ret;
1110 struct lttcomm_consumer_msg msg;
1111
1112 assert(socket);
1113 assert(socket->fd);
1114
1115 DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
1116
1117 msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
1118 msg.u.close_metadata.key = metadata_key;
1119
1120 pthread_mutex_lock(socket->lock);
1121 health_code_update();
1122
1123 ret = consumer_send_msg(socket, &msg);
1124 if (ret < 0) {
1125 goto end;
1126 }
1127
1128 end:
1129 health_code_update();
1130 pthread_mutex_unlock(socket->lock);
1131 return ret;
1132 }
1133
1134 /*
1135 * Send a setup metdata command to consumer using the given channel key.
1136 *
1137 * Return 0 on success else a negative value.
1138 */
1139 int consumer_setup_metadata(struct consumer_socket *socket,
1140 uint64_t metadata_key)
1141 {
1142 int ret;
1143 struct lttcomm_consumer_msg msg;
1144
1145 assert(socket);
1146 assert(socket->fd);
1147
1148 DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
1149
1150 msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
1151 msg.u.setup_metadata.key = metadata_key;
1152
1153 pthread_mutex_lock(socket->lock);
1154 health_code_update();
1155
1156 ret = consumer_send_msg(socket, &msg);
1157 if (ret < 0) {
1158 goto end;
1159 }
1160
1161 end:
1162 health_code_update();
1163 pthread_mutex_unlock(socket->lock);
1164 return ret;
1165 }
1166
1167 /*
1168 * Send metadata string to consumer. Socket lock MUST be acquired.
1169 *
1170 * Return 0 on success else a negative value.
1171 */
1172 int consumer_push_metadata(struct consumer_socket *socket,
1173 uint64_t metadata_key, char *metadata_str, size_t len,
1174 size_t target_offset)
1175 {
1176 int ret;
1177 struct lttcomm_consumer_msg msg;
1178
1179 assert(socket);
1180 assert(socket->fd);
1181
1182 DBG2("Consumer push metadata to consumer socket %d", *socket->fd);
1183
1184 msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
1185 msg.u.push_metadata.key = metadata_key;
1186 msg.u.push_metadata.target_offset = target_offset;
1187 msg.u.push_metadata.len = len;
1188
1189 health_code_update();
1190 ret = consumer_send_msg(socket, &msg);
1191 if (ret < 0 || len == 0) {
1192 goto end;
1193 }
1194
1195 DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd, len);
1196
1197 ret = lttcomm_send_unix_sock(*socket->fd, metadata_str, len);
1198 if (ret < 0) {
1199 goto end;
1200 }
1201
1202 health_code_update();
1203 ret = consumer_recv_status_reply(socket);
1204 if (ret < 0) {
1205 goto end;
1206 }
1207
1208 end:
1209 health_code_update();
1210 return ret;
1211 }
1212
1213 /*
1214 * Ask the consumer to snapshot a specific channel using the key.
1215 *
1216 * Return 0 on success or else a negative error.
1217 */
1218 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
1219 struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
1220 const char *session_path, int wait, int max_stream_size)
1221 {
1222 int ret;
1223 struct lttcomm_consumer_msg msg;
1224
1225 assert(socket);
1226 assert(socket->fd);
1227 assert(output);
1228 assert(output->consumer);
1229
1230 DBG("Consumer snapshot channel key %" PRIu64, key);
1231
1232 memset(&msg, 0, sizeof(msg));
1233 msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
1234 msg.u.snapshot_channel.key = key;
1235 msg.u.snapshot_channel.max_stream_size = max_stream_size;
1236 msg.u.snapshot_channel.metadata = metadata;
1237
1238 if (output->consumer->type == CONSUMER_DST_NET) {
1239 msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index;
1240 msg.u.snapshot_channel.use_relayd = 1;
1241 ret = snprintf(msg.u.snapshot_channel.pathname,
1242 sizeof(msg.u.snapshot_channel.pathname),
1243 "%s/%s-%s-%" PRIu64 "%s", output->consumer->subdir,
1244 output->name, output->datetime, output->nb_snapshot,
1245 session_path);
1246 if (ret < 0) {
1247 ret = -LTTNG_ERR_NOMEM;
1248 goto error;
1249 }
1250 } else {
1251 ret = snprintf(msg.u.snapshot_channel.pathname,
1252 sizeof(msg.u.snapshot_channel.pathname),
1253 "%s/%s-%s-%" PRIu64 "%s", output->consumer->dst.trace_path,
1254 output->name, output->datetime, output->nb_snapshot,
1255 session_path);
1256 if (ret < 0) {
1257 ret = -LTTNG_ERR_NOMEM;
1258 goto error;
1259 }
1260 msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
1261
1262 /* Create directory. Ignore if exist. */
1263 ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname,
1264 S_IRWXU | S_IRWXG, uid, gid);
1265 if (ret < 0) {
1266 if (ret != -EEXIST) {
1267 ERR("Trace directory creation error");
1268 goto error;
1269 }
1270 }
1271 }
1272
1273 health_code_update();
1274 ret = consumer_send_msg(socket, &msg);
1275 if (ret < 0) {
1276 goto error;
1277 }
1278
1279 error:
1280 health_code_update();
1281 return ret;
1282 }
This page took 0.093643 seconds and 6 git commands to generate.