Fix: don't print EPIPE error which can happen
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
f50f23d9
DG
33/*
34 * Receive a reply command status message from the consumer. Consumer socket
35 * lock MUST be acquired before calling this function.
36 *
37 * Return 0 on success, -1 on recv error or a negative lttng error code which
38 * was possibly returned by the consumer.
39 */
40int consumer_recv_status_reply(struct consumer_socket *sock)
41{
42 int ret;
43 struct lttcomm_consumer_status_msg reply;
44
45 assert(sock);
46
47 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
48 if (ret < 0) {
49 PERROR("recv consumer status msg");
50 goto end;
51 }
52
53 if (reply.ret_code == LTTNG_OK) {
54 /* All good. */
55 ret = 0;
56 } else {
57 ret = -reply.ret_code;
58 ERR("Consumer ret code %d", reply.ret_code);
59 }
60
61end:
62 return ret;
63}
64
2f77fc4b
DG
65/*
66 * Send destroy relayd command to consumer.
67 *
68 * On success return positive value. On error, negative value.
69 */
70int consumer_send_destroy_relayd(struct consumer_socket *sock,
71 struct consumer_output *consumer)
72{
73 int ret;
74 struct lttcomm_consumer_msg msg;
75
76 assert(consumer);
77 assert(sock);
78
79 DBG2("Sending destroy relayd command to consumer...");
80
81 /* Bail out if consumer is disabled */
82 if (!consumer->enabled) {
f73fabfd 83 ret = LTTNG_OK;
2f77fc4b
DG
84 DBG3("Consumer is disabled");
85 goto error;
86 }
87
88 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
89 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
90
91 pthread_mutex_lock(sock->lock);
92 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
2f77fc4b 93 if (ret < 0) {
c5c45efa
DG
94 /* Indicate that the consumer is probably closing at this point. */
95 DBG("send consumer destroy relayd command");
f50f23d9 96 goto error_send;
2f77fc4b
DG
97 }
98
f50f23d9
DG
99 /* Don't check the return value. The caller will do it. */
100 ret = consumer_recv_status_reply(sock);
101
2f77fc4b
DG
102 DBG2("Consumer send destroy relayd command done");
103
f50f23d9
DG
104error_send:
105 pthread_mutex_unlock(sock->lock);
2f77fc4b
DG
106error:
107 return ret;
108}
109
110/*
111 * For each consumer socket in the consumer output object, send a destroy
112 * relayd command.
113 */
114void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
115{
2f77fc4b
DG
116 struct lttng_ht_iter iter;
117 struct consumer_socket *socket;
118
119 assert(consumer);
120
121 /* Destroy any relayd connection */
122 if (consumer && consumer->type == CONSUMER_DST_NET) {
123 rcu_read_lock();
124 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
125 node.node) {
c617c0c6
MD
126 int ret;
127
2f77fc4b
DG
128 /* Send destroy relayd command */
129 ret = consumer_send_destroy_relayd(socket, consumer);
130 if (ret < 0) {
c5c45efa 131 DBG("Unable to send destroy relayd command to consumer");
2f77fc4b
DG
132 /* Continue since we MUST delete everything at this point. */
133 }
134 }
135 rcu_read_unlock();
136 }
137}
138
a4b92340
DG
139/*
140 * From a consumer_data structure, allocate and add a consumer socket to the
141 * consumer output.
142 *
143 * Return 0 on success, else negative value on error
144 */
145int consumer_create_socket(struct consumer_data *data,
146 struct consumer_output *output)
147{
148 int ret = 0;
149 struct consumer_socket *socket;
150
151 assert(data);
152
153 if (output == NULL || data->cmd_sock < 0) {
154 /*
155 * Not an error. Possible there is simply not spawned consumer or it's
156 * disabled for the tracing session asking the socket.
157 */
158 goto error;
159 }
160
161 rcu_read_lock();
162 socket = consumer_find_socket(data->cmd_sock, output);
163 rcu_read_unlock();
164 if (socket == NULL) {
165 socket = consumer_allocate_socket(data->cmd_sock);
166 if (socket == NULL) {
167 ret = -1;
168 goto error;
169 }
170
2f77fc4b 171 socket->registered = 0;
a4b92340
DG
172 socket->lock = &data->lock;
173 rcu_read_lock();
174 consumer_add_socket(socket, output);
175 rcu_read_unlock();
176 }
177
178 DBG3("Consumer socket created (fd: %d) and added to output",
179 data->cmd_sock);
180
181error:
182 return ret;
183}
184
173af62f
DG
185/*
186 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
187 * be acquired before calling this function and across use of the
188 * returned consumer_socket.
189 */
190struct consumer_socket *consumer_find_socket(int key,
191 struct consumer_output *consumer)
192{
193 struct lttng_ht_iter iter;
194 struct lttng_ht_node_ulong *node;
195 struct consumer_socket *socket = NULL;
196
197 /* Negative keys are lookup failures */
a4b92340 198 if (key < 0 || consumer == NULL) {
173af62f
DG
199 return NULL;
200 }
201
202 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
203 &iter);
204 node = lttng_ht_iter_get_node_ulong(&iter);
205 if (node != NULL) {
206 socket = caa_container_of(node, struct consumer_socket, node);
207 }
208
209 return socket;
210}
211
212/*
213 * Allocate a new consumer_socket and return the pointer.
214 */
215struct consumer_socket *consumer_allocate_socket(int fd)
216{
217 struct consumer_socket *socket = NULL;
218
219 socket = zmalloc(sizeof(struct consumer_socket));
220 if (socket == NULL) {
221 PERROR("zmalloc consumer socket");
222 goto error;
223 }
224
225 socket->fd = fd;
226 lttng_ht_node_init_ulong(&socket->node, fd);
227
228error:
229 return socket;
230}
231
232/*
233 * Add consumer socket to consumer output object. Read side lock must be
234 * acquired before calling this function.
235 */
236void consumer_add_socket(struct consumer_socket *sock,
237 struct consumer_output *consumer)
238{
239 assert(sock);
240 assert(consumer);
241
242 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
243}
244
245/*
246 * Delte consumer socket to consumer output object. Read side lock must be
247 * acquired before calling this function.
248 */
249void consumer_del_socket(struct consumer_socket *sock,
250 struct consumer_output *consumer)
251{
252 int ret;
253 struct lttng_ht_iter iter;
254
255 assert(sock);
256 assert(consumer);
257
258 iter.iter.node = &sock->node.node;
259 ret = lttng_ht_del(consumer->socks, &iter);
260 assert(!ret);
261}
262
263/*
264 * RCU destroy call function.
265 */
266static void destroy_socket_rcu(struct rcu_head *head)
267{
268 struct lttng_ht_node_ulong *node =
269 caa_container_of(head, struct lttng_ht_node_ulong, head);
270 struct consumer_socket *socket =
271 caa_container_of(node, struct consumer_socket, node);
272
273 free(socket);
274}
275
276/*
277 * Destroy and free socket pointer in a call RCU. Read side lock must be
278 * acquired before calling this function.
279 */
280void consumer_destroy_socket(struct consumer_socket *sock)
281{
282 assert(sock);
283
284 /*
285 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
286 * session daemon and is closed only if the consumer dies or a custom
287 * consumer was registered,
173af62f 288 */
2f77fc4b
DG
289 if (sock->registered) {
290 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
291 lttcomm_close_unix_sock(sock->fd);
292 }
173af62f
DG
293
294 call_rcu(&sock->node.head, destroy_socket_rcu);
295}
296
00e2e675
DG
297/*
298 * Allocate and assign data to a consumer_output object.
299 *
300 * Return pointer to structure.
301 */
302struct consumer_output *consumer_create_output(enum consumer_dst_type type)
303{
304 struct consumer_output *output = NULL;
305
306 output = zmalloc(sizeof(struct consumer_output));
307 if (output == NULL) {
308 PERROR("zmalloc consumer_output");
309 goto error;
310 }
311
312 /* By default, consumer output is enabled */
313 output->enabled = 1;
314 output->type = type;
315 output->net_seq_index = -1;
173af62f
DG
316
317 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
318
319error:
320 return output;
321}
322
323/*
324 * Delete the consumer_output object from the list and free the ptr.
325 */
326void consumer_destroy_output(struct consumer_output *obj)
327{
328 if (obj == NULL) {
329 return;
330 }
331
173af62f
DG
332 if (obj->socks) {
333 struct lttng_ht_iter iter;
334 struct consumer_socket *socket;
335
2f77fc4b 336 rcu_read_lock();
173af62f 337 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
2f77fc4b 338 consumer_del_socket(socket, obj);
173af62f
DG
339 consumer_destroy_socket(socket);
340 }
2f77fc4b
DG
341 rcu_read_unlock();
342
343 /* Finally destroy HT */
344 lttng_ht_destroy(obj->socks);
00e2e675 345 }
173af62f 346
00e2e675
DG
347 free(obj);
348}
349
350/*
351 * Copy consumer output and returned the newly allocated copy.
352 */
353struct consumer_output *consumer_copy_output(struct consumer_output *obj)
354{
09a90bcd 355 struct lttng_ht *tmp_ht_ptr;
173af62f
DG
356 struct lttng_ht_iter iter;
357 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
358 struct consumer_output *output;
359
360 assert(obj);
361
362 output = consumer_create_output(obj->type);
363 if (output == NULL) {
364 goto error;
365 }
09a90bcd
DG
366 /* Avoid losing the HT reference after the memcpy() */
367 tmp_ht_ptr = output->socks;
00e2e675
DG
368
369 memcpy(output, obj, sizeof(struct consumer_output));
370
09a90bcd
DG
371 /* Putting back the HT pointer and start copying socket(s). */
372 output->socks = tmp_ht_ptr;
173af62f
DG
373
374 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
375 /* Create new socket object. */
376 copy_sock = consumer_allocate_socket(socket->fd);
377 if (copy_sock == NULL) {
378 goto malloc_error;
379 }
380
09a90bcd 381 copy_sock->registered = socket->registered;
173af62f
DG
382 copy_sock->lock = socket->lock;
383 consumer_add_socket(copy_sock, output);
384 }
385
00e2e675
DG
386error:
387 return output;
173af62f
DG
388
389malloc_error:
390 consumer_destroy_output(output);
391 return NULL;
00e2e675
DG
392}
393
394/*
395 * Set network URI to the consumer output object.
396 *
ad20f474
DG
397 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
398 * error.
00e2e675
DG
399 */
400int consumer_set_network_uri(struct consumer_output *obj,
401 struct lttng_uri *uri)
402{
403 int ret;
404 char tmp_path[PATH_MAX];
405 char hostname[HOST_NAME_MAX];
406 struct lttng_uri *dst_uri = NULL;
407
408 /* Code flow error safety net. */
409 assert(obj);
410 assert(uri);
411
412 switch (uri->stype) {
413 case LTTNG_STREAM_CONTROL:
414 dst_uri = &obj->dst.net.control;
415 obj->dst.net.control_isset = 1;
416 if (uri->port == 0) {
417 /* Assign default port. */
418 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
419 }
ad20f474 420 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
421 break;
422 case LTTNG_STREAM_DATA:
423 dst_uri = &obj->dst.net.data;
424 obj->dst.net.data_isset = 1;
425 if (uri->port == 0) {
426 /* Assign default port. */
427 uri->port = DEFAULT_NETWORK_DATA_PORT;
428 }
ad20f474 429 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
430 break;
431 default:
432 ERR("Set network uri type unknown %d", uri->stype);
433 goto error;
434 }
435
436 ret = uri_compare(dst_uri, uri);
437 if (!ret) {
438 /* Same URI, don't touch it and return success. */
439 DBG3("URI network compare are the same");
ad20f474 440 goto equal;
00e2e675
DG
441 }
442
443 /* URIs were not equal, replacing it. */
444 memset(dst_uri, 0, sizeof(struct lttng_uri));
445 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
446 obj->type = CONSUMER_DST_NET;
447
448 /* Handle subdir and add hostname in front. */
449 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
450 /* Get hostname to append it in the pathname */
451 ret = gethostname(hostname, sizeof(hostname));
452 if (ret < 0) {
453 PERROR("gethostname. Fallback on default localhost");
454 strncpy(hostname, "localhost", sizeof(hostname));
455 }
456 hostname[sizeof(hostname) - 1] = '\0';
457
458 /* Setup consumer subdir if none present in the control URI */
459 if (strlen(dst_uri->subdir) == 0) {
460 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
461 hostname, obj->subdir);
462 } else {
463 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
464 hostname, dst_uri->subdir);
465 }
466 if (ret < 0) {
467 PERROR("snprintf set consumer uri subdir");
468 goto error;
469 }
470
471 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
472 DBG3("Consumer set network uri subdir path %s", tmp_path);
473 }
474
00e2e675 475 return 0;
ad20f474
DG
476equal:
477 return 1;
00e2e675
DG
478error:
479 return -1;
480}
481
482/*
483 * Send file descriptor to consumer via sock.
484 */
f50f23d9 485int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
00e2e675
DG
486{
487 int ret;
488
489 assert(fds);
f50f23d9 490 assert(sock);
00e2e675
DG
491 assert(nb_fd > 0);
492
f50f23d9 493 ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
00e2e675
DG
494 if (ret < 0) {
495 PERROR("send consumer fds");
496 goto error;
497 }
498
f50f23d9
DG
499 ret = consumer_recv_status_reply(sock);
500
00e2e675
DG
501error:
502 return ret;
503}
504
505/*
506 * Consumer send channel communication message structure to consumer.
507 */
f50f23d9
DG
508int consumer_send_channel(struct consumer_socket *sock,
509 struct lttcomm_consumer_msg *msg)
00e2e675
DG
510{
511 int ret;
512
513 assert(msg);
f50f23d9
DG
514 assert(sock);
515 assert(sock->fd >= 0);
00e2e675 516
f50f23d9 517 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
518 sizeof(struct lttcomm_consumer_msg));
519 if (ret < 0) {
520 PERROR("send consumer channel");
521 goto error;
522 }
523
f50f23d9
DG
524 ret = consumer_recv_status_reply(sock);
525
00e2e675
DG
526error:
527 return ret;
528}
529
530/*
531 * Init channel communication message structure.
532 */
533void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
534 enum lttng_consumer_command cmd,
535 int channel_key,
536 uint64_t max_sb_size,
537 uint64_t mmap_len,
c30aaa51
MD
538 const char *name,
539 unsigned int nb_init_streams)
00e2e675
DG
540{
541 assert(msg);
542
543 /* TODO: Args validation */
544
545 /* Zeroed structure */
546 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
547
548 /* Send channel */
549 msg->cmd_type = cmd;
550 msg->u.channel.channel_key = channel_key;
551 msg->u.channel.max_sb_size = max_sb_size;
552 msg->u.channel.mmap_len = mmap_len;
c30aaa51 553 msg->u.channel.nb_init_streams = nb_init_streams;
00e2e675
DG
554}
555
556/*
557 * Init stream communication message structure.
558 */
559void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
560 enum lttng_consumer_command cmd,
561 int channel_key,
562 int stream_key,
563 uint32_t state,
564 enum lttng_event_output output,
565 uint64_t mmap_len,
566 uid_t uid,
567 gid_t gid,
568 int net_index,
569 unsigned int metadata_flag,
570 const char *name,
ca22feea
DG
571 const char *pathname,
572 unsigned int session_id)
00e2e675
DG
573{
574 assert(msg);
575
576 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
577
578 /* TODO: Args validation */
579
580 msg->cmd_type = cmd;
581 msg->u.stream.channel_key = channel_key;
582 msg->u.stream.stream_key = stream_key;
583 msg->u.stream.state = state;
584 msg->u.stream.output = output;
585 msg->u.stream.mmap_len = mmap_len;
586 msg->u.stream.uid = uid;
587 msg->u.stream.gid = gid;
588 msg->u.stream.net_index = net_index;
589 msg->u.stream.metadata_flag = metadata_flag;
ca22feea 590 msg->u.stream.session_id = (uint64_t) session_id;
00e2e675
DG
591 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
592 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
593 strncpy(msg->u.stream.path_name, pathname,
594 sizeof(msg->u.stream.path_name));
595 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
596}
597
598/*
599 * Send stream communication structure to the consumer.
600 */
f50f23d9
DG
601int consumer_send_stream(struct consumer_socket *sock,
602 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
603 int *fds, size_t nb_fd)
00e2e675
DG
604{
605 int ret;
606
607 assert(msg);
608 assert(dst);
f50f23d9 609 assert(sock);
00e2e675
DG
610
611 switch (dst->type) {
612 case CONSUMER_DST_NET:
613 /* Consumer should send the stream on the network. */
614 msg->u.stream.net_index = dst->net_seq_index;
615 break;
616 case CONSUMER_DST_LOCAL:
617 /* Add stream file name to stream path */
c30ce0b3
CB
618 strncat(msg->u.stream.path_name, "/",
619 sizeof(msg->u.stream.path_name) -
620 strlen(msg->u.stream.path_name) - 1);
00e2e675 621 strncat(msg->u.stream.path_name, msg->u.stream.name,
c30ce0b3
CB
622 sizeof(msg->u.stream.path_name) -
623 strlen(msg->u.stream.path_name) - 1);
00e2e675
DG
624 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
625 /* Indicate that the stream is NOT network */
626 msg->u.stream.net_index = -1;
627 break;
628 default:
629 ERR("Consumer unknown output type (%d)", dst->type);
630 ret = -1;
631 goto error;
632 }
633
634 /* Send on socket */
f50f23d9 635 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
636 sizeof(struct lttcomm_consumer_msg));
637 if (ret < 0) {
638 PERROR("send consumer stream");
639 goto error;
640 }
641
f50f23d9
DG
642 ret = consumer_recv_status_reply(sock);
643 if (ret < 0) {
644 goto error;
645 }
646
00e2e675
DG
647 ret = consumer_send_fds(sock, fds, nb_fd);
648 if (ret < 0) {
649 goto error;
650 }
651
652error:
653 return ret;
654}
37278a1e
DG
655
656/*
657 * Send relayd socket to consumer associated with a session name.
658 *
659 * On success return positive value. On error, negative value.
660 */
f50f23d9 661int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
37278a1e 662 struct lttcomm_sock *sock, struct consumer_output *consumer,
46e6455f 663 enum lttng_stream_type type, unsigned int session_id)
37278a1e
DG
664{
665 int ret;
666 struct lttcomm_consumer_msg msg;
667
668 /* Code flow error. Safety net. */
669 assert(sock);
670 assert(consumer);
f50f23d9 671 assert(consumer_sock);
37278a1e
DG
672
673 /* Bail out if consumer is disabled */
674 if (!consumer->enabled) {
f73fabfd 675 ret = LTTNG_OK;
37278a1e
DG
676 goto error;
677 }
678
679 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
680 /*
681 * Assign network consumer output index using the temporary consumer since
682 * this call should only be made from within a set_consumer_uri() function
683 * call in the session daemon.
684 */
685 msg.u.relayd_sock.net_index = consumer->net_seq_index;
686 msg.u.relayd_sock.type = type;
46e6455f 687 msg.u.relayd_sock.session_id = session_id;
37278a1e
DG
688 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
689
f50f23d9
DG
690 DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
691 ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
37278a1e
DG
692 if (ret < 0) {
693 PERROR("send consumer relayd socket info");
694 goto error;
695 }
696
f50f23d9
DG
697 ret = consumer_recv_status_reply(consumer_sock);
698 if (ret < 0) {
699 goto error;
700 }
701
37278a1e
DG
702 DBG3("Sending relayd socket file descriptor to consumer");
703 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
704 if (ret < 0) {
705 goto error;
706 }
707
708 DBG2("Consumer relayd socket sent");
709
710error:
711 return ret;
712}
173af62f
DG
713
714/*
2f77fc4b
DG
715 * Set consumer subdirectory using the session name and a generated datetime if
716 * needed. This is appended to the current subdirectory.
173af62f 717 */
2f77fc4b
DG
718int consumer_set_subdir(struct consumer_output *consumer,
719 const char *session_name)
173af62f 720{
2f77fc4b
DG
721 int ret = 0;
722 unsigned int have_default_name = 0;
723 char datetime[16], tmp_path[PATH_MAX];
724 time_t rawtime;
725 struct tm *timeinfo;
173af62f
DG
726
727 assert(consumer);
2f77fc4b
DG
728 assert(session_name);
729
730 memset(tmp_path, 0, sizeof(tmp_path));
731
732 /* Flag if we have a default session. */
733 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
734 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
735 have_default_name = 1;
736 } else {
737 /* Get date and time for session path */
738 time(&rawtime);
739 timeinfo = localtime(&rawtime);
740 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
173af62f
DG
741 }
742
2f77fc4b
DG
743 if (have_default_name) {
744 ret = snprintf(tmp_path, sizeof(tmp_path),
745 "%s/%s", consumer->subdir, session_name);
746 } else {
747 ret = snprintf(tmp_path, sizeof(tmp_path),
748 "%s/%s-%s/", consumer->subdir, session_name, datetime);
749 }
173af62f 750 if (ret < 0) {
2f77fc4b 751 PERROR("snprintf session name date");
173af62f
DG
752 goto error;
753 }
754
2f77fc4b
DG
755 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
756 DBG2("Consumer subdir set to %s", consumer->subdir);
173af62f
DG
757
758error:
759 return ret;
760}
806e2684
DG
761
762/*
6d805429 763 * Ask the consumer if the data is ready to read (NOT pending) for the specific
806e2684
DG
764 * session id.
765 *
766 * This function has a different behavior with the consumer i.e. that it waits
6d805429 767 * for a reply from the consumer if yes or no the data is pending.
806e2684 768 */
6d805429 769int consumer_is_data_pending(unsigned int id,
806e2684
DG
770 struct consumer_output *consumer)
771{
772 int ret;
6d805429 773 int32_t ret_code = 0; /* Default is that the data is NOT pending */
806e2684
DG
774 struct consumer_socket *socket;
775 struct lttng_ht_iter iter;
776 struct lttcomm_consumer_msg msg;
777
778 assert(consumer);
779
6d805429 780 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
806e2684 781
6d805429 782 msg.u.data_pending.session_id = (uint64_t) id;
806e2684 783
6d805429 784 DBG3("Consumer data pending for id %u", id);
806e2684 785
c8f59ee5 786 /* Send command for each consumer */
806e2684
DG
787 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
788 node.node) {
789 /* Code flow error */
790 assert(socket->fd >= 0);
791
792 pthread_mutex_lock(socket->lock);
793
794 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
795 if (ret < 0) {
6d805429 796 PERROR("send consumer data pending command");
806e2684
DG
797 pthread_mutex_unlock(socket->lock);
798 goto error;
799 }
800
f50f23d9
DG
801 /*
802 * No need for a recv reply status because the answer to the command is
803 * the reply status message.
804 */
805
806e2684
DG
806 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
807 if (ret < 0) {
6d805429 808 PERROR("recv consumer data pending status");
806e2684
DG
809 pthread_mutex_unlock(socket->lock);
810 goto error;
811 }
812
813 pthread_mutex_unlock(socket->lock);
814
6d805429 815 if (ret_code == 1) {
806e2684
DG
816 break;
817 }
818 }
819
6d805429 820 DBG("Consumer data pending ret %d", ret_code);
806e2684
DG
821 return ret_code;
822
823error:
824 return -1;
825}
This page took 0.0620540000000001 seconds and 5 git commands to generate.