Fix: DBG statement in relayd
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
f50f23d9
DG
33/*
34 * Receive a reply command status message from the consumer. Consumer socket
35 * lock MUST be acquired before calling this function.
36 *
37 * Return 0 on success, -1 on recv error or a negative lttng error code which
38 * was possibly returned by the consumer.
39 */
40int consumer_recv_status_reply(struct consumer_socket *sock)
41{
42 int ret;
43 struct lttcomm_consumer_status_msg reply;
44
45 assert(sock);
46
47 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
48 if (ret < 0) {
49 PERROR("recv consumer status msg");
50 goto end;
51 }
52
53 if (reply.ret_code == LTTNG_OK) {
54 /* All good. */
55 ret = 0;
56 } else {
57 ret = -reply.ret_code;
58 ERR("Consumer ret code %d", reply.ret_code);
59 }
60
61end:
62 return ret;
63}
64
2f77fc4b
DG
65/*
66 * Send destroy relayd command to consumer.
67 *
68 * On success return positive value. On error, negative value.
69 */
70int consumer_send_destroy_relayd(struct consumer_socket *sock,
71 struct consumer_output *consumer)
72{
73 int ret;
74 struct lttcomm_consumer_msg msg;
75
76 assert(consumer);
77 assert(sock);
78
79 DBG2("Sending destroy relayd command to consumer...");
80
81 /* Bail out if consumer is disabled */
82 if (!consumer->enabled) {
f73fabfd 83 ret = LTTNG_OK;
2f77fc4b
DG
84 DBG3("Consumer is disabled");
85 goto error;
86 }
87
88 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
89 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
90
91 pthread_mutex_lock(sock->lock);
92 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
2f77fc4b
DG
93 if (ret < 0) {
94 PERROR("send consumer destroy relayd command");
f50f23d9 95 goto error_send;
2f77fc4b
DG
96 }
97
f50f23d9
DG
98 /* Don't check the return value. The caller will do it. */
99 ret = consumer_recv_status_reply(sock);
100
2f77fc4b
DG
101 DBG2("Consumer send destroy relayd command done");
102
f50f23d9
DG
103error_send:
104 pthread_mutex_unlock(sock->lock);
2f77fc4b
DG
105error:
106 return ret;
107}
108
109/*
110 * For each consumer socket in the consumer output object, send a destroy
111 * relayd command.
112 */
113void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
114{
2f77fc4b
DG
115 struct lttng_ht_iter iter;
116 struct consumer_socket *socket;
117
118 assert(consumer);
119
120 /* Destroy any relayd connection */
121 if (consumer && consumer->type == CONSUMER_DST_NET) {
122 rcu_read_lock();
123 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
124 node.node) {
c617c0c6
MD
125 int ret;
126
2f77fc4b
DG
127 /* Send destroy relayd command */
128 ret = consumer_send_destroy_relayd(socket, consumer);
129 if (ret < 0) {
130 ERR("Unable to send destroy relayd command to consumer");
131 /* Continue since we MUST delete everything at this point. */
132 }
133 }
134 rcu_read_unlock();
135 }
136}
137
a4b92340
DG
138/*
139 * From a consumer_data structure, allocate and add a consumer socket to the
140 * consumer output.
141 *
142 * Return 0 on success, else negative value on error
143 */
144int consumer_create_socket(struct consumer_data *data,
145 struct consumer_output *output)
146{
147 int ret = 0;
148 struct consumer_socket *socket;
149
150 assert(data);
151
152 if (output == NULL || data->cmd_sock < 0) {
153 /*
154 * Not an error. Possible there is simply not spawned consumer or it's
155 * disabled for the tracing session asking the socket.
156 */
157 goto error;
158 }
159
160 rcu_read_lock();
161 socket = consumer_find_socket(data->cmd_sock, output);
162 rcu_read_unlock();
163 if (socket == NULL) {
164 socket = consumer_allocate_socket(data->cmd_sock);
165 if (socket == NULL) {
166 ret = -1;
167 goto error;
168 }
169
2f77fc4b 170 socket->registered = 0;
a4b92340
DG
171 socket->lock = &data->lock;
172 rcu_read_lock();
173 consumer_add_socket(socket, output);
174 rcu_read_unlock();
175 }
176
177 DBG3("Consumer socket created (fd: %d) and added to output",
178 data->cmd_sock);
179
180error:
181 return ret;
182}
183
173af62f
DG
184/*
185 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
186 * be acquired before calling this function and across use of the
187 * returned consumer_socket.
188 */
189struct consumer_socket *consumer_find_socket(int key,
190 struct consumer_output *consumer)
191{
192 struct lttng_ht_iter iter;
193 struct lttng_ht_node_ulong *node;
194 struct consumer_socket *socket = NULL;
195
196 /* Negative keys are lookup failures */
a4b92340 197 if (key < 0 || consumer == NULL) {
173af62f
DG
198 return NULL;
199 }
200
201 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
202 &iter);
203 node = lttng_ht_iter_get_node_ulong(&iter);
204 if (node != NULL) {
205 socket = caa_container_of(node, struct consumer_socket, node);
206 }
207
208 return socket;
209}
210
211/*
212 * Allocate a new consumer_socket and return the pointer.
213 */
214struct consumer_socket *consumer_allocate_socket(int fd)
215{
216 struct consumer_socket *socket = NULL;
217
218 socket = zmalloc(sizeof(struct consumer_socket));
219 if (socket == NULL) {
220 PERROR("zmalloc consumer socket");
221 goto error;
222 }
223
224 socket->fd = fd;
225 lttng_ht_node_init_ulong(&socket->node, fd);
226
227error:
228 return socket;
229}
230
231/*
232 * Add consumer socket to consumer output object. Read side lock must be
233 * acquired before calling this function.
234 */
235void consumer_add_socket(struct consumer_socket *sock,
236 struct consumer_output *consumer)
237{
238 assert(sock);
239 assert(consumer);
240
241 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
242}
243
244/*
245 * Delte consumer socket to consumer output object. Read side lock must be
246 * acquired before calling this function.
247 */
248void consumer_del_socket(struct consumer_socket *sock,
249 struct consumer_output *consumer)
250{
251 int ret;
252 struct lttng_ht_iter iter;
253
254 assert(sock);
255 assert(consumer);
256
257 iter.iter.node = &sock->node.node;
258 ret = lttng_ht_del(consumer->socks, &iter);
259 assert(!ret);
260}
261
262/*
263 * RCU destroy call function.
264 */
265static void destroy_socket_rcu(struct rcu_head *head)
266{
267 struct lttng_ht_node_ulong *node =
268 caa_container_of(head, struct lttng_ht_node_ulong, head);
269 struct consumer_socket *socket =
270 caa_container_of(node, struct consumer_socket, node);
271
272 free(socket);
273}
274
275/*
276 * Destroy and free socket pointer in a call RCU. Read side lock must be
277 * acquired before calling this function.
278 */
279void consumer_destroy_socket(struct consumer_socket *sock)
280{
281 assert(sock);
282
283 /*
284 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
285 * session daemon and is closed only if the consumer dies or a custom
286 * consumer was registered,
173af62f 287 */
2f77fc4b
DG
288 if (sock->registered) {
289 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
290 lttcomm_close_unix_sock(sock->fd);
291 }
173af62f
DG
292
293 call_rcu(&sock->node.head, destroy_socket_rcu);
294}
295
00e2e675
DG
296/*
297 * Allocate and assign data to a consumer_output object.
298 *
299 * Return pointer to structure.
300 */
301struct consumer_output *consumer_create_output(enum consumer_dst_type type)
302{
303 struct consumer_output *output = NULL;
304
305 output = zmalloc(sizeof(struct consumer_output));
306 if (output == NULL) {
307 PERROR("zmalloc consumer_output");
308 goto error;
309 }
310
311 /* By default, consumer output is enabled */
312 output->enabled = 1;
313 output->type = type;
314 output->net_seq_index = -1;
173af62f
DG
315
316 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
317
318error:
319 return output;
320}
321
322/*
323 * Delete the consumer_output object from the list and free the ptr.
324 */
325void consumer_destroy_output(struct consumer_output *obj)
326{
327 if (obj == NULL) {
328 return;
329 }
330
173af62f
DG
331 if (obj->socks) {
332 struct lttng_ht_iter iter;
333 struct consumer_socket *socket;
334
2f77fc4b 335 rcu_read_lock();
173af62f 336 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
2f77fc4b 337 consumer_del_socket(socket, obj);
173af62f
DG
338 consumer_destroy_socket(socket);
339 }
2f77fc4b
DG
340 rcu_read_unlock();
341
342 /* Finally destroy HT */
343 lttng_ht_destroy(obj->socks);
00e2e675 344 }
173af62f 345
00e2e675
DG
346 free(obj);
347}
348
349/*
350 * Copy consumer output and returned the newly allocated copy.
351 */
352struct consumer_output *consumer_copy_output(struct consumer_output *obj)
353{
09a90bcd 354 struct lttng_ht *tmp_ht_ptr;
173af62f
DG
355 struct lttng_ht_iter iter;
356 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
357 struct consumer_output *output;
358
359 assert(obj);
360
361 output = consumer_create_output(obj->type);
362 if (output == NULL) {
363 goto error;
364 }
09a90bcd
DG
365 /* Avoid losing the HT reference after the memcpy() */
366 tmp_ht_ptr = output->socks;
00e2e675
DG
367
368 memcpy(output, obj, sizeof(struct consumer_output));
369
09a90bcd
DG
370 /* Putting back the HT pointer and start copying socket(s). */
371 output->socks = tmp_ht_ptr;
173af62f
DG
372
373 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
374 /* Create new socket object. */
375 copy_sock = consumer_allocate_socket(socket->fd);
376 if (copy_sock == NULL) {
377 goto malloc_error;
378 }
379
09a90bcd 380 copy_sock->registered = socket->registered;
173af62f
DG
381 copy_sock->lock = socket->lock;
382 consumer_add_socket(copy_sock, output);
383 }
384
00e2e675
DG
385error:
386 return output;
173af62f
DG
387
388malloc_error:
389 consumer_destroy_output(output);
390 return NULL;
00e2e675
DG
391}
392
393/*
394 * Set network URI to the consumer output object.
395 *
ad20f474
DG
396 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
397 * error.
00e2e675
DG
398 */
399int consumer_set_network_uri(struct consumer_output *obj,
400 struct lttng_uri *uri)
401{
402 int ret;
403 char tmp_path[PATH_MAX];
404 char hostname[HOST_NAME_MAX];
405 struct lttng_uri *dst_uri = NULL;
406
407 /* Code flow error safety net. */
408 assert(obj);
409 assert(uri);
410
411 switch (uri->stype) {
412 case LTTNG_STREAM_CONTROL:
413 dst_uri = &obj->dst.net.control;
414 obj->dst.net.control_isset = 1;
415 if (uri->port == 0) {
416 /* Assign default port. */
417 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
418 }
ad20f474 419 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
420 break;
421 case LTTNG_STREAM_DATA:
422 dst_uri = &obj->dst.net.data;
423 obj->dst.net.data_isset = 1;
424 if (uri->port == 0) {
425 /* Assign default port. */
426 uri->port = DEFAULT_NETWORK_DATA_PORT;
427 }
ad20f474 428 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
429 break;
430 default:
431 ERR("Set network uri type unknown %d", uri->stype);
432 goto error;
433 }
434
435 ret = uri_compare(dst_uri, uri);
436 if (!ret) {
437 /* Same URI, don't touch it and return success. */
438 DBG3("URI network compare are the same");
ad20f474 439 goto equal;
00e2e675
DG
440 }
441
442 /* URIs were not equal, replacing it. */
443 memset(dst_uri, 0, sizeof(struct lttng_uri));
444 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
445 obj->type = CONSUMER_DST_NET;
446
447 /* Handle subdir and add hostname in front. */
448 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
449 /* Get hostname to append it in the pathname */
450 ret = gethostname(hostname, sizeof(hostname));
451 if (ret < 0) {
452 PERROR("gethostname. Fallback on default localhost");
453 strncpy(hostname, "localhost", sizeof(hostname));
454 }
455 hostname[sizeof(hostname) - 1] = '\0';
456
457 /* Setup consumer subdir if none present in the control URI */
458 if (strlen(dst_uri->subdir) == 0) {
459 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
460 hostname, obj->subdir);
461 } else {
462 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
463 hostname, dst_uri->subdir);
464 }
465 if (ret < 0) {
466 PERROR("snprintf set consumer uri subdir");
467 goto error;
468 }
469
470 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
471 DBG3("Consumer set network uri subdir path %s", tmp_path);
472 }
473
00e2e675 474 return 0;
ad20f474
DG
475equal:
476 return 1;
00e2e675
DG
477error:
478 return -1;
479}
480
481/*
482 * Send file descriptor to consumer via sock.
483 */
f50f23d9 484int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
00e2e675
DG
485{
486 int ret;
487
488 assert(fds);
f50f23d9 489 assert(sock);
00e2e675
DG
490 assert(nb_fd > 0);
491
f50f23d9 492 ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
00e2e675
DG
493 if (ret < 0) {
494 PERROR("send consumer fds");
495 goto error;
496 }
497
f50f23d9
DG
498 ret = consumer_recv_status_reply(sock);
499
00e2e675
DG
500error:
501 return ret;
502}
503
504/*
505 * Consumer send channel communication message structure to consumer.
506 */
f50f23d9
DG
507int consumer_send_channel(struct consumer_socket *sock,
508 struct lttcomm_consumer_msg *msg)
00e2e675
DG
509{
510 int ret;
511
512 assert(msg);
f50f23d9
DG
513 assert(sock);
514 assert(sock->fd >= 0);
00e2e675 515
f50f23d9 516 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
517 sizeof(struct lttcomm_consumer_msg));
518 if (ret < 0) {
519 PERROR("send consumer channel");
520 goto error;
521 }
522
f50f23d9
DG
523 ret = consumer_recv_status_reply(sock);
524
00e2e675
DG
525error:
526 return ret;
527}
528
529/*
530 * Init channel communication message structure.
531 */
532void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
533 enum lttng_consumer_command cmd,
534 int channel_key,
535 uint64_t max_sb_size,
536 uint64_t mmap_len,
c30aaa51
MD
537 const char *name,
538 unsigned int nb_init_streams)
00e2e675
DG
539{
540 assert(msg);
541
542 /* TODO: Args validation */
543
544 /* Zeroed structure */
545 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
546
547 /* Send channel */
548 msg->cmd_type = cmd;
549 msg->u.channel.channel_key = channel_key;
550 msg->u.channel.max_sb_size = max_sb_size;
551 msg->u.channel.mmap_len = mmap_len;
c30aaa51 552 msg->u.channel.nb_init_streams = nb_init_streams;
00e2e675
DG
553}
554
555/*
556 * Init stream communication message structure.
557 */
558void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
559 enum lttng_consumer_command cmd,
560 int channel_key,
561 int stream_key,
562 uint32_t state,
563 enum lttng_event_output output,
564 uint64_t mmap_len,
565 uid_t uid,
566 gid_t gid,
567 int net_index,
568 unsigned int metadata_flag,
569 const char *name,
ca22feea
DG
570 const char *pathname,
571 unsigned int session_id)
00e2e675
DG
572{
573 assert(msg);
574
575 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
576
577 /* TODO: Args validation */
578
579 msg->cmd_type = cmd;
580 msg->u.stream.channel_key = channel_key;
581 msg->u.stream.stream_key = stream_key;
582 msg->u.stream.state = state;
583 msg->u.stream.output = output;
584 msg->u.stream.mmap_len = mmap_len;
585 msg->u.stream.uid = uid;
586 msg->u.stream.gid = gid;
587 msg->u.stream.net_index = net_index;
588 msg->u.stream.metadata_flag = metadata_flag;
ca22feea 589 msg->u.stream.session_id = (uint64_t) session_id;
00e2e675
DG
590 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
591 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
592 strncpy(msg->u.stream.path_name, pathname,
593 sizeof(msg->u.stream.path_name));
594 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
595}
596
597/*
598 * Send stream communication structure to the consumer.
599 */
f50f23d9
DG
600int consumer_send_stream(struct consumer_socket *sock,
601 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
602 int *fds, size_t nb_fd)
00e2e675
DG
603{
604 int ret;
605
606 assert(msg);
607 assert(dst);
f50f23d9 608 assert(sock);
00e2e675
DG
609
610 switch (dst->type) {
611 case CONSUMER_DST_NET:
612 /* Consumer should send the stream on the network. */
613 msg->u.stream.net_index = dst->net_seq_index;
614 break;
615 case CONSUMER_DST_LOCAL:
616 /* Add stream file name to stream path */
c30ce0b3
CB
617 strncat(msg->u.stream.path_name, "/",
618 sizeof(msg->u.stream.path_name) -
619 strlen(msg->u.stream.path_name) - 1);
00e2e675 620 strncat(msg->u.stream.path_name, msg->u.stream.name,
c30ce0b3
CB
621 sizeof(msg->u.stream.path_name) -
622 strlen(msg->u.stream.path_name) - 1);
00e2e675
DG
623 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
624 /* Indicate that the stream is NOT network */
625 msg->u.stream.net_index = -1;
626 break;
627 default:
628 ERR("Consumer unknown output type (%d)", dst->type);
629 ret = -1;
630 goto error;
631 }
632
633 /* Send on socket */
f50f23d9 634 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
635 sizeof(struct lttcomm_consumer_msg));
636 if (ret < 0) {
637 PERROR("send consumer stream");
638 goto error;
639 }
640
f50f23d9
DG
641 ret = consumer_recv_status_reply(sock);
642 if (ret < 0) {
643 goto error;
644 }
645
00e2e675
DG
646 ret = consumer_send_fds(sock, fds, nb_fd);
647 if (ret < 0) {
648 goto error;
649 }
650
651error:
652 return ret;
653}
37278a1e
DG
654
655/*
656 * Send relayd socket to consumer associated with a session name.
657 *
658 * On success return positive value. On error, negative value.
659 */
f50f23d9 660int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
37278a1e 661 struct lttcomm_sock *sock, struct consumer_output *consumer,
46e6455f 662 enum lttng_stream_type type, unsigned int session_id)
37278a1e
DG
663{
664 int ret;
665 struct lttcomm_consumer_msg msg;
666
667 /* Code flow error. Safety net. */
668 assert(sock);
669 assert(consumer);
f50f23d9 670 assert(consumer_sock);
37278a1e
DG
671
672 /* Bail out if consumer is disabled */
673 if (!consumer->enabled) {
f73fabfd 674 ret = LTTNG_OK;
37278a1e
DG
675 goto error;
676 }
677
678 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
679 /*
680 * Assign network consumer output index using the temporary consumer since
681 * this call should only be made from within a set_consumer_uri() function
682 * call in the session daemon.
683 */
684 msg.u.relayd_sock.net_index = consumer->net_seq_index;
685 msg.u.relayd_sock.type = type;
46e6455f 686 msg.u.relayd_sock.session_id = session_id;
37278a1e
DG
687 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
688
f50f23d9
DG
689 DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
690 ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
37278a1e
DG
691 if (ret < 0) {
692 PERROR("send consumer relayd socket info");
693 goto error;
694 }
695
f50f23d9
DG
696 ret = consumer_recv_status_reply(consumer_sock);
697 if (ret < 0) {
698 goto error;
699 }
700
37278a1e
DG
701 DBG3("Sending relayd socket file descriptor to consumer");
702 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
703 if (ret < 0) {
704 goto error;
705 }
706
707 DBG2("Consumer relayd socket sent");
708
709error:
710 return ret;
711}
173af62f
DG
712
713/*
2f77fc4b
DG
714 * Set consumer subdirectory using the session name and a generated datetime if
715 * needed. This is appended to the current subdirectory.
173af62f 716 */
2f77fc4b
DG
717int consumer_set_subdir(struct consumer_output *consumer,
718 const char *session_name)
173af62f 719{
2f77fc4b
DG
720 int ret = 0;
721 unsigned int have_default_name = 0;
722 char datetime[16], tmp_path[PATH_MAX];
723 time_t rawtime;
724 struct tm *timeinfo;
173af62f
DG
725
726 assert(consumer);
2f77fc4b
DG
727 assert(session_name);
728
729 memset(tmp_path, 0, sizeof(tmp_path));
730
731 /* Flag if we have a default session. */
732 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
733 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
734 have_default_name = 1;
735 } else {
736 /* Get date and time for session path */
737 time(&rawtime);
738 timeinfo = localtime(&rawtime);
739 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
173af62f
DG
740 }
741
2f77fc4b
DG
742 if (have_default_name) {
743 ret = snprintf(tmp_path, sizeof(tmp_path),
744 "%s/%s", consumer->subdir, session_name);
745 } else {
746 ret = snprintf(tmp_path, sizeof(tmp_path),
747 "%s/%s-%s/", consumer->subdir, session_name, datetime);
748 }
173af62f 749 if (ret < 0) {
2f77fc4b 750 PERROR("snprintf session name date");
173af62f
DG
751 goto error;
752 }
753
2f77fc4b
DG
754 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
755 DBG2("Consumer subdir set to %s", consumer->subdir);
173af62f
DG
756
757error:
758 return ret;
759}
806e2684
DG
760
761/*
6d805429 762 * Ask the consumer if the data is ready to read (NOT pending) for the specific
806e2684
DG
763 * session id.
764 *
765 * This function has a different behavior with the consumer i.e. that it waits
6d805429 766 * for a reply from the consumer if yes or no the data is pending.
806e2684 767 */
6d805429 768int consumer_is_data_pending(unsigned int id,
806e2684
DG
769 struct consumer_output *consumer)
770{
771 int ret;
6d805429 772 int32_t ret_code = 0; /* Default is that the data is NOT pending */
806e2684
DG
773 struct consumer_socket *socket;
774 struct lttng_ht_iter iter;
775 struct lttcomm_consumer_msg msg;
776
777 assert(consumer);
778
6d805429 779 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
806e2684 780
6d805429 781 msg.u.data_pending.session_id = (uint64_t) id;
806e2684 782
6d805429 783 DBG3("Consumer data pending for id %u", id);
806e2684 784
c8f59ee5 785 /* Send command for each consumer */
806e2684
DG
786 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
787 node.node) {
788 /* Code flow error */
789 assert(socket->fd >= 0);
790
791 pthread_mutex_lock(socket->lock);
792
793 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
794 if (ret < 0) {
6d805429 795 PERROR("send consumer data pending command");
806e2684
DG
796 pthread_mutex_unlock(socket->lock);
797 goto error;
798 }
799
f50f23d9
DG
800 /*
801 * No need for a recv reply status because the answer to the command is
802 * the reply status message.
803 */
804
806e2684
DG
805 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
806 if (ret < 0) {
6d805429 807 PERROR("recv consumer data pending status");
806e2684
DG
808 pthread_mutex_unlock(socket->lock);
809 goto error;
810 }
811
812 pthread_mutex_unlock(socket->lock);
813
6d805429 814 if (ret_code == 1) {
806e2684
DG
815 break;
816 }
817 }
818
6d805429 819 DBG("Consumer data pending ret %d", ret_code);
806e2684
DG
820 return ret_code;
821
822error:
823 return -1;
824}
This page took 0.061529 seconds and 5 git commands to generate.