Update version to v2.1.0
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
f50f23d9
DG
33/*
34 * Receive a reply command status message from the consumer. Consumer socket
35 * lock MUST be acquired before calling this function.
36 *
37 * Return 0 on success, -1 on recv error or a negative lttng error code which
38 * was possibly returned by the consumer.
39 */
40int consumer_recv_status_reply(struct consumer_socket *sock)
41{
42 int ret;
43 struct lttcomm_consumer_status_msg reply;
44
45 assert(sock);
46
47 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
a6cd2b97
DG
48 if (ret <= 0) {
49 if (ret == 0) {
50 /* Orderly shutdown. Don't return 0 which means success. */
51 ret = -1;
52 }
3448e266
DG
53 /* The above call will print a PERROR on error. */
54 DBG("Fail to receive status reply on sock %d", sock->fd);
f50f23d9
DG
55 goto end;
56 }
57
58 if (reply.ret_code == LTTNG_OK) {
59 /* All good. */
60 ret = 0;
61 } else {
62 ret = -reply.ret_code;
3448e266 63 DBG("Consumer ret code %d", reply.ret_code);
f50f23d9
DG
64 }
65
66end:
67 return ret;
68}
69
2f77fc4b
DG
70/*
71 * Send destroy relayd command to consumer.
72 *
73 * On success return positive value. On error, negative value.
74 */
75int consumer_send_destroy_relayd(struct consumer_socket *sock,
76 struct consumer_output *consumer)
77{
78 int ret;
79 struct lttcomm_consumer_msg msg;
80
81 assert(consumer);
82 assert(sock);
83
84 DBG2("Sending destroy relayd command to consumer...");
85
86 /* Bail out if consumer is disabled */
87 if (!consumer->enabled) {
f73fabfd 88 ret = LTTNG_OK;
2f77fc4b
DG
89 DBG3("Consumer is disabled");
90 goto error;
91 }
92
93 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
94 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
95
96 pthread_mutex_lock(sock->lock);
97 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
2f77fc4b 98 if (ret < 0) {
c5c45efa
DG
99 /* Indicate that the consumer is probably closing at this point. */
100 DBG("send consumer destroy relayd command");
f50f23d9 101 goto error_send;
2f77fc4b
DG
102 }
103
f50f23d9
DG
104 /* Don't check the return value. The caller will do it. */
105 ret = consumer_recv_status_reply(sock);
106
2f77fc4b
DG
107 DBG2("Consumer send destroy relayd command done");
108
f50f23d9
DG
109error_send:
110 pthread_mutex_unlock(sock->lock);
2f77fc4b
DG
111error:
112 return ret;
113}
114
115/*
116 * For each consumer socket in the consumer output object, send a destroy
117 * relayd command.
118 */
119void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
120{
2f77fc4b
DG
121 struct lttng_ht_iter iter;
122 struct consumer_socket *socket;
123
124 assert(consumer);
125
126 /* Destroy any relayd connection */
127 if (consumer && consumer->type == CONSUMER_DST_NET) {
128 rcu_read_lock();
129 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
130 node.node) {
c617c0c6
MD
131 int ret;
132
2f77fc4b
DG
133 /* Send destroy relayd command */
134 ret = consumer_send_destroy_relayd(socket, consumer);
135 if (ret < 0) {
c5c45efa 136 DBG("Unable to send destroy relayd command to consumer");
2f77fc4b
DG
137 /* Continue since we MUST delete everything at this point. */
138 }
139 }
140 rcu_read_unlock();
141 }
142}
143
a4b92340
DG
144/*
145 * From a consumer_data structure, allocate and add a consumer socket to the
146 * consumer output.
147 *
148 * Return 0 on success, else negative value on error
149 */
150int consumer_create_socket(struct consumer_data *data,
151 struct consumer_output *output)
152{
153 int ret = 0;
154 struct consumer_socket *socket;
155
156 assert(data);
157
158 if (output == NULL || data->cmd_sock < 0) {
159 /*
160 * Not an error. Possible there is simply not spawned consumer or it's
161 * disabled for the tracing session asking the socket.
162 */
163 goto error;
164 }
165
166 rcu_read_lock();
167 socket = consumer_find_socket(data->cmd_sock, output);
168 rcu_read_unlock();
169 if (socket == NULL) {
170 socket = consumer_allocate_socket(data->cmd_sock);
171 if (socket == NULL) {
172 ret = -1;
173 goto error;
174 }
175
2f77fc4b 176 socket->registered = 0;
a4b92340
DG
177 socket->lock = &data->lock;
178 rcu_read_lock();
179 consumer_add_socket(socket, output);
180 rcu_read_unlock();
181 }
182
183 DBG3("Consumer socket created (fd: %d) and added to output",
184 data->cmd_sock);
185
186error:
187 return ret;
188}
189
173af62f
DG
190/*
191 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
192 * be acquired before calling this function and across use of the
193 * returned consumer_socket.
194 */
195struct consumer_socket *consumer_find_socket(int key,
196 struct consumer_output *consumer)
197{
198 struct lttng_ht_iter iter;
199 struct lttng_ht_node_ulong *node;
200 struct consumer_socket *socket = NULL;
201
202 /* Negative keys are lookup failures */
a4b92340 203 if (key < 0 || consumer == NULL) {
173af62f
DG
204 return NULL;
205 }
206
207 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
208 &iter);
209 node = lttng_ht_iter_get_node_ulong(&iter);
210 if (node != NULL) {
211 socket = caa_container_of(node, struct consumer_socket, node);
212 }
213
214 return socket;
215}
216
217/*
218 * Allocate a new consumer_socket and return the pointer.
219 */
220struct consumer_socket *consumer_allocate_socket(int fd)
221{
222 struct consumer_socket *socket = NULL;
223
224 socket = zmalloc(sizeof(struct consumer_socket));
225 if (socket == NULL) {
226 PERROR("zmalloc consumer socket");
227 goto error;
228 }
229
230 socket->fd = fd;
231 lttng_ht_node_init_ulong(&socket->node, fd);
232
233error:
234 return socket;
235}
236
237/*
238 * Add consumer socket to consumer output object. Read side lock must be
239 * acquired before calling this function.
240 */
241void consumer_add_socket(struct consumer_socket *sock,
242 struct consumer_output *consumer)
243{
244 assert(sock);
245 assert(consumer);
246
247 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
248}
249
250/*
251 * Delte consumer socket to consumer output object. Read side lock must be
252 * acquired before calling this function.
253 */
254void consumer_del_socket(struct consumer_socket *sock,
255 struct consumer_output *consumer)
256{
257 int ret;
258 struct lttng_ht_iter iter;
259
260 assert(sock);
261 assert(consumer);
262
263 iter.iter.node = &sock->node.node;
264 ret = lttng_ht_del(consumer->socks, &iter);
265 assert(!ret);
266}
267
268/*
269 * RCU destroy call function.
270 */
271static void destroy_socket_rcu(struct rcu_head *head)
272{
273 struct lttng_ht_node_ulong *node =
274 caa_container_of(head, struct lttng_ht_node_ulong, head);
275 struct consumer_socket *socket =
276 caa_container_of(node, struct consumer_socket, node);
277
278 free(socket);
279}
280
281/*
282 * Destroy and free socket pointer in a call RCU. Read side lock must be
283 * acquired before calling this function.
284 */
285void consumer_destroy_socket(struct consumer_socket *sock)
286{
287 assert(sock);
288
289 /*
290 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
291 * session daemon and is closed only if the consumer dies or a custom
292 * consumer was registered,
173af62f 293 */
2f77fc4b
DG
294 if (sock->registered) {
295 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
296 lttcomm_close_unix_sock(sock->fd);
297 }
173af62f
DG
298
299 call_rcu(&sock->node.head, destroy_socket_rcu);
300}
301
00e2e675
DG
302/*
303 * Allocate and assign data to a consumer_output object.
304 *
305 * Return pointer to structure.
306 */
307struct consumer_output *consumer_create_output(enum consumer_dst_type type)
308{
309 struct consumer_output *output = NULL;
310
311 output = zmalloc(sizeof(struct consumer_output));
312 if (output == NULL) {
313 PERROR("zmalloc consumer_output");
314 goto error;
315 }
316
317 /* By default, consumer output is enabled */
318 output->enabled = 1;
319 output->type = type;
320 output->net_seq_index = -1;
173af62f
DG
321
322 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
323
324error:
325 return output;
326}
327
328/*
329 * Delete the consumer_output object from the list and free the ptr.
330 */
331void consumer_destroy_output(struct consumer_output *obj)
332{
333 if (obj == NULL) {
334 return;
335 }
336
173af62f
DG
337 if (obj->socks) {
338 struct lttng_ht_iter iter;
339 struct consumer_socket *socket;
340
2f77fc4b 341 rcu_read_lock();
173af62f 342 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
2f77fc4b 343 consumer_del_socket(socket, obj);
173af62f
DG
344 consumer_destroy_socket(socket);
345 }
2f77fc4b
DG
346 rcu_read_unlock();
347
348 /* Finally destroy HT */
349 lttng_ht_destroy(obj->socks);
00e2e675 350 }
173af62f 351
00e2e675
DG
352 free(obj);
353}
354
355/*
356 * Copy consumer output and returned the newly allocated copy.
357 */
358struct consumer_output *consumer_copy_output(struct consumer_output *obj)
359{
09a90bcd 360 struct lttng_ht *tmp_ht_ptr;
173af62f
DG
361 struct lttng_ht_iter iter;
362 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
363 struct consumer_output *output;
364
365 assert(obj);
366
367 output = consumer_create_output(obj->type);
368 if (output == NULL) {
369 goto error;
370 }
09a90bcd
DG
371 /* Avoid losing the HT reference after the memcpy() */
372 tmp_ht_ptr = output->socks;
00e2e675
DG
373
374 memcpy(output, obj, sizeof(struct consumer_output));
375
09a90bcd
DG
376 /* Putting back the HT pointer and start copying socket(s). */
377 output->socks = tmp_ht_ptr;
173af62f
DG
378
379 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
380 /* Create new socket object. */
381 copy_sock = consumer_allocate_socket(socket->fd);
382 if (copy_sock == NULL) {
383 goto malloc_error;
384 }
385
09a90bcd 386 copy_sock->registered = socket->registered;
173af62f
DG
387 copy_sock->lock = socket->lock;
388 consumer_add_socket(copy_sock, output);
389 }
390
00e2e675
DG
391error:
392 return output;
173af62f
DG
393
394malloc_error:
395 consumer_destroy_output(output);
396 return NULL;
00e2e675
DG
397}
398
399/*
400 * Set network URI to the consumer output object.
401 *
ad20f474
DG
402 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
403 * error.
00e2e675
DG
404 */
405int consumer_set_network_uri(struct consumer_output *obj,
406 struct lttng_uri *uri)
407{
408 int ret;
409 char tmp_path[PATH_MAX];
410 char hostname[HOST_NAME_MAX];
411 struct lttng_uri *dst_uri = NULL;
412
413 /* Code flow error safety net. */
414 assert(obj);
415 assert(uri);
416
417 switch (uri->stype) {
418 case LTTNG_STREAM_CONTROL:
419 dst_uri = &obj->dst.net.control;
420 obj->dst.net.control_isset = 1;
421 if (uri->port == 0) {
422 /* Assign default port. */
423 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
424 }
ad20f474 425 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
426 break;
427 case LTTNG_STREAM_DATA:
428 dst_uri = &obj->dst.net.data;
429 obj->dst.net.data_isset = 1;
430 if (uri->port == 0) {
431 /* Assign default port. */
432 uri->port = DEFAULT_NETWORK_DATA_PORT;
433 }
ad20f474 434 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
435 break;
436 default:
437 ERR("Set network uri type unknown %d", uri->stype);
438 goto error;
439 }
440
441 ret = uri_compare(dst_uri, uri);
442 if (!ret) {
443 /* Same URI, don't touch it and return success. */
444 DBG3("URI network compare are the same");
ad20f474 445 goto equal;
00e2e675
DG
446 }
447
448 /* URIs were not equal, replacing it. */
449 memset(dst_uri, 0, sizeof(struct lttng_uri));
450 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
451 obj->type = CONSUMER_DST_NET;
452
453 /* Handle subdir and add hostname in front. */
454 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
455 /* Get hostname to append it in the pathname */
456 ret = gethostname(hostname, sizeof(hostname));
457 if (ret < 0) {
458 PERROR("gethostname. Fallback on default localhost");
459 strncpy(hostname, "localhost", sizeof(hostname));
460 }
461 hostname[sizeof(hostname) - 1] = '\0';
462
463 /* Setup consumer subdir if none present in the control URI */
464 if (strlen(dst_uri->subdir) == 0) {
465 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
466 hostname, obj->subdir);
467 } else {
468 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
469 hostname, dst_uri->subdir);
470 }
471 if (ret < 0) {
472 PERROR("snprintf set consumer uri subdir");
473 goto error;
474 }
475
476 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
477 DBG3("Consumer set network uri subdir path %s", tmp_path);
478 }
479
00e2e675 480 return 0;
ad20f474
DG
481equal:
482 return 1;
00e2e675
DG
483error:
484 return -1;
485}
486
487/*
488 * Send file descriptor to consumer via sock.
489 */
f50f23d9 490int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
00e2e675
DG
491{
492 int ret;
493
494 assert(fds);
f50f23d9 495 assert(sock);
00e2e675
DG
496 assert(nb_fd > 0);
497
f50f23d9 498 ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
00e2e675 499 if (ret < 0) {
3448e266
DG
500 /* The above call will print a PERROR on error. */
501 DBG("Error when sending consumer fds on sock %d", sock->fd);
00e2e675
DG
502 goto error;
503 }
504
f50f23d9
DG
505 ret = consumer_recv_status_reply(sock);
506
00e2e675
DG
507error:
508 return ret;
509}
510
511/*
512 * Consumer send channel communication message structure to consumer.
513 */
f50f23d9
DG
514int consumer_send_channel(struct consumer_socket *sock,
515 struct lttcomm_consumer_msg *msg)
00e2e675
DG
516{
517 int ret;
518
519 assert(msg);
f50f23d9
DG
520 assert(sock);
521 assert(sock->fd >= 0);
00e2e675 522
f50f23d9 523 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
524 sizeof(struct lttcomm_consumer_msg));
525 if (ret < 0) {
3448e266
DG
526 /* The above call will print a PERROR on error. */
527 DBG("Error when sending consumer channel on sock %d", sock->fd);
00e2e675
DG
528 goto error;
529 }
530
f50f23d9
DG
531 ret = consumer_recv_status_reply(sock);
532
00e2e675
DG
533error:
534 return ret;
535}
536
537/*
538 * Init channel communication message structure.
539 */
540void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
541 enum lttng_consumer_command cmd,
542 int channel_key,
543 uint64_t max_sb_size,
544 uint64_t mmap_len,
c30aaa51
MD
545 const char *name,
546 unsigned int nb_init_streams)
00e2e675
DG
547{
548 assert(msg);
549
550 /* TODO: Args validation */
551
552 /* Zeroed structure */
553 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
554
555 /* Send channel */
556 msg->cmd_type = cmd;
557 msg->u.channel.channel_key = channel_key;
558 msg->u.channel.max_sb_size = max_sb_size;
559 msg->u.channel.mmap_len = mmap_len;
c30aaa51 560 msg->u.channel.nb_init_streams = nb_init_streams;
00e2e675
DG
561}
562
563/*
564 * Init stream communication message structure.
565 */
566void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
567 enum lttng_consumer_command cmd,
568 int channel_key,
569 int stream_key,
570 uint32_t state,
571 enum lttng_event_output output,
572 uint64_t mmap_len,
573 uid_t uid,
574 gid_t gid,
575 int net_index,
576 unsigned int metadata_flag,
577 const char *name,
ca22feea
DG
578 const char *pathname,
579 unsigned int session_id)
00e2e675
DG
580{
581 assert(msg);
582
583 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
584
585 /* TODO: Args validation */
586
587 msg->cmd_type = cmd;
588 msg->u.stream.channel_key = channel_key;
589 msg->u.stream.stream_key = stream_key;
590 msg->u.stream.state = state;
591 msg->u.stream.output = output;
592 msg->u.stream.mmap_len = mmap_len;
593 msg->u.stream.uid = uid;
594 msg->u.stream.gid = gid;
595 msg->u.stream.net_index = net_index;
596 msg->u.stream.metadata_flag = metadata_flag;
ca22feea 597 msg->u.stream.session_id = (uint64_t) session_id;
00e2e675
DG
598 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
599 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
600 strncpy(msg->u.stream.path_name, pathname,
601 sizeof(msg->u.stream.path_name));
602 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
603}
604
605/*
606 * Send stream communication structure to the consumer.
607 */
f50f23d9
DG
608int consumer_send_stream(struct consumer_socket *sock,
609 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
610 int *fds, size_t nb_fd)
00e2e675
DG
611{
612 int ret;
613
614 assert(msg);
615 assert(dst);
f50f23d9 616 assert(sock);
00e2e675
DG
617
618 switch (dst->type) {
619 case CONSUMER_DST_NET:
620 /* Consumer should send the stream on the network. */
621 msg->u.stream.net_index = dst->net_seq_index;
622 break;
623 case CONSUMER_DST_LOCAL:
624 /* Add stream file name to stream path */
c30ce0b3
CB
625 strncat(msg->u.stream.path_name, "/",
626 sizeof(msg->u.stream.path_name) -
627 strlen(msg->u.stream.path_name) - 1);
00e2e675 628 strncat(msg->u.stream.path_name, msg->u.stream.name,
c30ce0b3
CB
629 sizeof(msg->u.stream.path_name) -
630 strlen(msg->u.stream.path_name) - 1);
00e2e675
DG
631 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
632 /* Indicate that the stream is NOT network */
633 msg->u.stream.net_index = -1;
634 break;
635 default:
636 ERR("Consumer unknown output type (%d)", dst->type);
637 ret = -1;
638 goto error;
639 }
640
641 /* Send on socket */
f50f23d9 642 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
643 sizeof(struct lttcomm_consumer_msg));
644 if (ret < 0) {
3448e266
DG
645 /* The above call will print a PERROR on error. */
646 DBG("Error when sending consumer stream on sock %d", sock->fd);
00e2e675
DG
647 goto error;
648 }
649
f50f23d9
DG
650 ret = consumer_recv_status_reply(sock);
651 if (ret < 0) {
652 goto error;
653 }
654
00e2e675
DG
655 ret = consumer_send_fds(sock, fds, nb_fd);
656 if (ret < 0) {
657 goto error;
658 }
659
660error:
661 return ret;
662}
37278a1e
DG
663
664/*
665 * Send relayd socket to consumer associated with a session name.
666 *
667 * On success return positive value. On error, negative value.
668 */
f50f23d9 669int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
37278a1e 670 struct lttcomm_sock *sock, struct consumer_output *consumer,
46e6455f 671 enum lttng_stream_type type, unsigned int session_id)
37278a1e
DG
672{
673 int ret;
674 struct lttcomm_consumer_msg msg;
675
676 /* Code flow error. Safety net. */
677 assert(sock);
678 assert(consumer);
f50f23d9 679 assert(consumer_sock);
37278a1e
DG
680
681 /* Bail out if consumer is disabled */
682 if (!consumer->enabled) {
f73fabfd 683 ret = LTTNG_OK;
37278a1e
DG
684 goto error;
685 }
686
687 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
688 /*
689 * Assign network consumer output index using the temporary consumer since
690 * this call should only be made from within a set_consumer_uri() function
691 * call in the session daemon.
692 */
693 msg.u.relayd_sock.net_index = consumer->net_seq_index;
694 msg.u.relayd_sock.type = type;
46e6455f 695 msg.u.relayd_sock.session_id = session_id;
37278a1e
DG
696 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
697
f50f23d9
DG
698 DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
699 ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
37278a1e 700 if (ret < 0) {
3448e266
DG
701 /* The above call will print a PERROR on error. */
702 DBG("Error when sending relayd sockets on sock %d", sock->fd);
37278a1e
DG
703 goto error;
704 }
705
f50f23d9
DG
706 ret = consumer_recv_status_reply(consumer_sock);
707 if (ret < 0) {
708 goto error;
709 }
710
37278a1e
DG
711 DBG3("Sending relayd socket file descriptor to consumer");
712 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
713 if (ret < 0) {
714 goto error;
715 }
716
717 DBG2("Consumer relayd socket sent");
718
719error:
720 return ret;
721}
173af62f
DG
722
723/*
2f77fc4b
DG
724 * Set consumer subdirectory using the session name and a generated datetime if
725 * needed. This is appended to the current subdirectory.
173af62f 726 */
2f77fc4b
DG
727int consumer_set_subdir(struct consumer_output *consumer,
728 const char *session_name)
173af62f 729{
2f77fc4b
DG
730 int ret = 0;
731 unsigned int have_default_name = 0;
732 char datetime[16], tmp_path[PATH_MAX];
733 time_t rawtime;
734 struct tm *timeinfo;
173af62f
DG
735
736 assert(consumer);
2f77fc4b
DG
737 assert(session_name);
738
739 memset(tmp_path, 0, sizeof(tmp_path));
740
741 /* Flag if we have a default session. */
742 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
743 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
744 have_default_name = 1;
745 } else {
746 /* Get date and time for session path */
747 time(&rawtime);
748 timeinfo = localtime(&rawtime);
749 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
173af62f
DG
750 }
751
2f77fc4b
DG
752 if (have_default_name) {
753 ret = snprintf(tmp_path, sizeof(tmp_path),
754 "%s/%s", consumer->subdir, session_name);
755 } else {
756 ret = snprintf(tmp_path, sizeof(tmp_path),
757 "%s/%s-%s/", consumer->subdir, session_name, datetime);
758 }
173af62f 759 if (ret < 0) {
2f77fc4b 760 PERROR("snprintf session name date");
173af62f
DG
761 goto error;
762 }
763
2f77fc4b
DG
764 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
765 DBG2("Consumer subdir set to %s", consumer->subdir);
173af62f
DG
766
767error:
768 return ret;
769}
806e2684
DG
770
771/*
6d805429 772 * Ask the consumer if the data is ready to read (NOT pending) for the specific
806e2684
DG
773 * session id.
774 *
775 * This function has a different behavior with the consumer i.e. that it waits
6d805429 776 * for a reply from the consumer if yes or no the data is pending.
806e2684 777 */
6d805429 778int consumer_is_data_pending(unsigned int id,
806e2684
DG
779 struct consumer_output *consumer)
780{
781 int ret;
6d805429 782 int32_t ret_code = 0; /* Default is that the data is NOT pending */
806e2684
DG
783 struct consumer_socket *socket;
784 struct lttng_ht_iter iter;
785 struct lttcomm_consumer_msg msg;
786
787 assert(consumer);
788
6d805429 789 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
806e2684 790
6d805429 791 msg.u.data_pending.session_id = (uint64_t) id;
806e2684 792
6d805429 793 DBG3("Consumer data pending for id %u", id);
806e2684 794
c8f59ee5 795 /* Send command for each consumer */
806e2684
DG
796 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
797 node.node) {
798 /* Code flow error */
799 assert(socket->fd >= 0);
800
801 pthread_mutex_lock(socket->lock);
802
803 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
804 if (ret < 0) {
3448e266
DG
805 /* The above call will print a PERROR on error. */
806 DBG("Error on consumer is data pending on sock %d", socket->fd);
806e2684
DG
807 pthread_mutex_unlock(socket->lock);
808 goto error;
809 }
810
f50f23d9
DG
811 /*
812 * No need for a recv reply status because the answer to the command is
813 * the reply status message.
814 */
815
806e2684 816 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
a6cd2b97
DG
817 if (ret <= 0) {
818 if (ret == 0) {
819 /* Orderly shutdown. Don't return 0 which means success. */
820 ret = -1;
821 }
3448e266
DG
822 /* The above call will print a PERROR on error. */
823 DBG("Error on recv consumer is data pending on sock %d", socket->fd);
806e2684
DG
824 pthread_mutex_unlock(socket->lock);
825 goto error;
826 }
827
828 pthread_mutex_unlock(socket->lock);
829
6d805429 830 if (ret_code == 1) {
806e2684
DG
831 break;
832 }
833 }
834
3448e266
DG
835 DBG("Consumer data is %s pending for session id %u",
836 ret_code == 1 ? "" : "NOT", id);
806e2684
DG
837 return ret_code;
838
839error:
840 return -1;
841}
This page took 0.063847 seconds and 5 git commands to generate.