b46a505021203a017832cec88826244f02895643
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
1 /*
2 * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include <stdint.h>
11 #define _LGPL_SOURCE
12 #include <assert.h>
13 #include <lttng/ust-ctl.h>
14 #include <poll.h>
15 #include <pthread.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <sys/mman.h>
19 #include <sys/socket.h>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22 #include <inttypes.h>
23 #include <unistd.h>
24 #include <urcu/list.h>
25 #include <signal.h>
26 #include <stdbool.h>
27
28 #include <bin/lttng-consumerd/health-consumerd.h>
29 #include <common/common.h>
30 #include <common/sessiond-comm/sessiond-comm.h>
31 #include <common/relayd/relayd.h>
32 #include <common/compat/fcntl.h>
33 #include <common/compat/endian.h>
34 #include <common/consumer/consumer-metadata-cache.h>
35 #include <common/consumer/consumer-stream.h>
36 #include <common/consumer/consumer-timer.h>
37 #include <common/utils.h>
38 #include <common/index/index.h>
39
40 #include "ust-consumer.h"
41
42 #define INT_MAX_STR_LEN 12 /* includes \0 */
43
44 extern struct lttng_consumer_global_data consumer_data;
45 extern int consumer_poll_timeout;
46
47 /*
48 * Free channel object and all streams associated with it. This MUST be used
49 * only and only if the channel has _NEVER_ been added to the global channel
50 * hash table.
51 */
52 static void destroy_channel(struct lttng_consumer_channel *channel)
53 {
54 struct lttng_consumer_stream *stream, *stmp;
55
56 assert(channel);
57
58 DBG("UST consumer cleaning stream list");
59
60 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
61 send_node) {
62
63 health_code_update();
64
65 cds_list_del(&stream->send_node);
66 ustctl_destroy_stream(stream->ustream);
67 lttng_trace_chunk_put(stream->trace_chunk);
68 free(stream);
69 }
70
71 /*
72 * If a channel is available meaning that was created before the streams
73 * were, delete it.
74 */
75 if (channel->uchan) {
76 lttng_ustconsumer_del_channel(channel);
77 lttng_ustconsumer_free_channel(channel);
78 }
79 free(channel);
80 }
81
82 /*
83 * Add channel to internal consumer state.
84 *
85 * Returns 0 on success or else a negative value.
86 */
87 static int add_channel(struct lttng_consumer_channel *channel,
88 struct lttng_consumer_local_data *ctx)
89 {
90 int ret = 0;
91
92 assert(channel);
93 assert(ctx);
94
95 if (ctx->on_recv_channel != NULL) {
96 ret = ctx->on_recv_channel(channel);
97 if (ret == 0) {
98 ret = consumer_add_channel(channel, ctx);
99 } else if (ret < 0) {
100 /* Most likely an ENOMEM. */
101 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
102 goto error;
103 }
104 } else {
105 ret = consumer_add_channel(channel, ctx);
106 }
107
108 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
109
110 error:
111 return ret;
112 }
113
114 /*
115 * Allocate and return a consumer channel object.
116 */
117 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
118 const uint64_t *chunk_id, const char *pathname, const char *name,
119 uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
120 uint64_t tracefile_size, uint64_t tracefile_count,
121 uint64_t session_id_per_pid, unsigned int monitor,
122 unsigned int live_timer_interval,
123 const char *root_shm_path, const char *shm_path)
124 {
125 assert(pathname);
126 assert(name);
127
128 return consumer_allocate_channel(key, session_id, chunk_id, pathname,
129 name, relayd_id, output, tracefile_size,
130 tracefile_count, session_id_per_pid, monitor,
131 live_timer_interval, root_shm_path, shm_path);
132 }
133
134 /*
135 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
136 * error value if applicable is set in it else it is kept untouched.
137 *
138 * Return NULL on error else the newly allocated stream object.
139 */
140 static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
141 struct lttng_consumer_channel *channel,
142 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
143 {
144 int alloc_ret;
145 struct lttng_consumer_stream *stream = NULL;
146
147 assert(channel);
148 assert(ctx);
149
150 stream = consumer_allocate_stream(
151 channel,
152 channel->key,
153 key,
154 channel->name,
155 channel->relayd_id,
156 channel->session_id,
157 channel->trace_chunk,
158 cpu,
159 &alloc_ret,
160 channel->type,
161 channel->monitor);
162 if (stream == NULL) {
163 switch (alloc_ret) {
164 case -ENOENT:
165 /*
166 * We could not find the channel. Can happen if cpu hotplug
167 * happens while tearing down.
168 */
169 DBG3("Could not find channel");
170 break;
171 case -ENOMEM:
172 case -EINVAL:
173 default:
174 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
175 break;
176 }
177 goto error;
178 }
179
180 consumer_stream_update_channel_attributes(stream, channel);
181
182 error:
183 if (_alloc_ret) {
184 *_alloc_ret = alloc_ret;
185 }
186 return stream;
187 }
188
189 /*
190 * Send the given stream pointer to the corresponding thread.
191 *
192 * Returns 0 on success else a negative value.
193 */
194 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
195 struct lttng_consumer_local_data *ctx)
196 {
197 int ret;
198 struct lttng_pipe *stream_pipe;
199
200 /* Get the right pipe where the stream will be sent. */
201 if (stream->metadata_flag) {
202 consumer_add_metadata_stream(stream);
203 stream_pipe = ctx->consumer_metadata_pipe;
204 } else {
205 consumer_add_data_stream(stream);
206 stream_pipe = ctx->consumer_data_pipe;
207 }
208
209 /*
210 * From this point on, the stream's ownership has been moved away from
211 * the channel and it becomes globally visible. Hence, remove it from
212 * the local stream list to prevent the stream from being both local and
213 * global.
214 */
215 stream->globally_visible = 1;
216 cds_list_del(&stream->send_node);
217
218 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
219 if (ret < 0) {
220 ERR("Consumer write %s stream to pipe %d",
221 stream->metadata_flag ? "metadata" : "data",
222 lttng_pipe_get_writefd(stream_pipe));
223 if (stream->metadata_flag) {
224 consumer_del_stream_for_metadata(stream);
225 } else {
226 consumer_del_stream_for_data(stream);
227 }
228 goto error;
229 }
230
231 error:
232 return ret;
233 }
234
235 static
236 int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
237 {
238 char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
239 int ret;
240
241 strncpy(stream_shm_path, shm_path, PATH_MAX);
242 stream_shm_path[PATH_MAX - 1] = '\0';
243 ret = snprintf(cpu_nr, INT_MAX_STR_LEN, "%i", cpu);
244 if (ret < 0) {
245 PERROR("snprintf");
246 goto end;
247 }
248 strncat(stream_shm_path, cpu_nr,
249 PATH_MAX - strlen(stream_shm_path) - 1);
250 ret = 0;
251 end:
252 return ret;
253 }
254
255 /*
256 * Create streams for the given channel using liblttng-ust-ctl.
257 * The channel lock must be acquired by the caller.
258 *
259 * Return 0 on success else a negative value.
260 */
261 static int create_ust_streams(struct lttng_consumer_channel *channel,
262 struct lttng_consumer_local_data *ctx)
263 {
264 int ret, cpu = 0;
265 struct ustctl_consumer_stream *ustream;
266 struct lttng_consumer_stream *stream;
267 pthread_mutex_t *current_stream_lock = NULL;
268
269 assert(channel);
270 assert(ctx);
271
272 /*
273 * While a stream is available from ustctl. When NULL is returned, we've
274 * reached the end of the possible stream for the channel.
275 */
276 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
277 int wait_fd;
278 int ust_metadata_pipe[2];
279
280 health_code_update();
281
282 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
283 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
284 if (ret < 0) {
285 ERR("Create ust metadata poll pipe");
286 goto error;
287 }
288 wait_fd = ust_metadata_pipe[0];
289 } else {
290 wait_fd = ustctl_stream_get_wait_fd(ustream);
291 }
292
293 /* Allocate consumer stream object. */
294 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
295 if (!stream) {
296 goto error_alloc;
297 }
298 stream->ustream = ustream;
299 /*
300 * Store it so we can save multiple function calls afterwards since
301 * this value is used heavily in the stream threads. This is UST
302 * specific so this is why it's done after allocation.
303 */
304 stream->wait_fd = wait_fd;
305
306 /*
307 * Increment channel refcount since the channel reference has now been
308 * assigned in the allocation process above.
309 */
310 if (stream->chan->monitor) {
311 uatomic_inc(&stream->chan->refcount);
312 }
313
314 pthread_mutex_lock(&stream->lock);
315 current_stream_lock = &stream->lock;
316 /*
317 * Order is important this is why a list is used. On error, the caller
318 * should clean this list.
319 */
320 cds_list_add_tail(&stream->send_node, &channel->streams.head);
321
322 ret = ustctl_get_max_subbuf_size(stream->ustream,
323 &stream->max_sb_size);
324 if (ret < 0) {
325 ERR("ustctl_get_max_subbuf_size failed for stream %s",
326 stream->name);
327 goto error;
328 }
329
330 /* Do actions once stream has been received. */
331 if (ctx->on_recv_stream) {
332 ret = ctx->on_recv_stream(stream);
333 if (ret < 0) {
334 goto error;
335 }
336 }
337
338 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
339 stream->name, stream->key, stream->relayd_stream_id);
340
341 /* Set next CPU stream. */
342 channel->streams.count = ++cpu;
343
344 /* Keep stream reference when creating metadata. */
345 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
346 channel->metadata_stream = stream;
347 if (channel->monitor) {
348 /* Set metadata poll pipe if we created one */
349 memcpy(stream->ust_metadata_poll_pipe,
350 ust_metadata_pipe,
351 sizeof(ust_metadata_pipe));
352 }
353 }
354 pthread_mutex_unlock(&stream->lock);
355 current_stream_lock = NULL;
356 }
357
358 return 0;
359
360 error:
361 error_alloc:
362 if (current_stream_lock) {
363 pthread_mutex_unlock(current_stream_lock);
364 }
365 return ret;
366 }
367
368 /*
369 * create_posix_shm is never called concurrently within a process.
370 */
371 static
372 int create_posix_shm(void)
373 {
374 char tmp_name[NAME_MAX];
375 int shmfd, ret;
376
377 ret = snprintf(tmp_name, NAME_MAX, "/ust-shm-consumer-%d", getpid());
378 if (ret < 0) {
379 PERROR("snprintf");
380 return -1;
381 }
382 /*
383 * Allocate shm, and immediately unlink its shm oject, keeping
384 * only the file descriptor as a reference to the object.
385 * We specifically do _not_ use the / at the beginning of the
386 * pathname so that some OS implementations can keep it local to
387 * the process (POSIX leaves this implementation-defined).
388 */
389 shmfd = shm_open(tmp_name, O_CREAT | O_EXCL | O_RDWR, 0700);
390 if (shmfd < 0) {
391 PERROR("shm_open");
392 goto error_shm_open;
393 }
394 ret = shm_unlink(tmp_name);
395 if (ret < 0 && errno != ENOENT) {
396 PERROR("shm_unlink");
397 goto error_shm_release;
398 }
399 return shmfd;
400
401 error_shm_release:
402 ret = close(shmfd);
403 if (ret) {
404 PERROR("close");
405 }
406 error_shm_open:
407 return -1;
408 }
409
410 static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
411 const struct lttng_credentials *session_credentials)
412 {
413 char shm_path[PATH_MAX];
414 int ret;
415
416 if (!channel->shm_path[0]) {
417 return create_posix_shm();
418 }
419 ret = get_stream_shm_path(shm_path, channel->shm_path, cpu);
420 if (ret) {
421 goto error_shm_path;
422 }
423 return run_as_open(shm_path,
424 O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
425 session_credentials->uid, session_credentials->gid);
426
427 error_shm_path:
428 return -1;
429 }
430
431 /*
432 * Create an UST channel with the given attributes and send it to the session
433 * daemon using the ust ctl API.
434 *
435 * Return 0 on success or else a negative value.
436 */
437 static int create_ust_channel(struct lttng_consumer_channel *channel,
438 struct ustctl_consumer_channel_attr *attr,
439 struct ustctl_consumer_channel **ust_chanp)
440 {
441 int ret, nr_stream_fds, i, j;
442 int *stream_fds;
443 struct ustctl_consumer_channel *ust_channel;
444
445 assert(channel);
446 assert(attr);
447 assert(ust_chanp);
448 assert(channel->buffer_credentials.is_set);
449
450 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
451 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
452 "switch_timer_interval: %u, read_timer_interval: %u, "
453 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
454 attr->num_subbuf, attr->switch_timer_interval,
455 attr->read_timer_interval, attr->output, attr->type);
456
457 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
458 nr_stream_fds = 1;
459 else
460 nr_stream_fds = ustctl_get_nr_stream_per_channel();
461 stream_fds = zmalloc(nr_stream_fds * sizeof(*stream_fds));
462 if (!stream_fds) {
463 ret = -1;
464 goto error_alloc;
465 }
466 for (i = 0; i < nr_stream_fds; i++) {
467 stream_fds[i] = open_ust_stream_fd(channel, i,
468 &channel->buffer_credentials.value);
469 if (stream_fds[i] < 0) {
470 ret = -1;
471 goto error_open;
472 }
473 }
474 ust_channel = ustctl_create_channel(attr, stream_fds, nr_stream_fds);
475 if (!ust_channel) {
476 ret = -1;
477 goto error_create;
478 }
479 channel->nr_stream_fds = nr_stream_fds;
480 channel->stream_fds = stream_fds;
481 *ust_chanp = ust_channel;
482
483 return 0;
484
485 error_create:
486 error_open:
487 for (j = i - 1; j >= 0; j--) {
488 int closeret;
489
490 closeret = close(stream_fds[j]);
491 if (closeret) {
492 PERROR("close");
493 }
494 if (channel->shm_path[0]) {
495 char shm_path[PATH_MAX];
496
497 closeret = get_stream_shm_path(shm_path,
498 channel->shm_path, j);
499 if (closeret) {
500 ERR("Cannot get stream shm path");
501 }
502 closeret = run_as_unlink(shm_path,
503 channel->buffer_credentials.value.uid,
504 channel->buffer_credentials.value.gid);
505 if (closeret) {
506 PERROR("unlink %s", shm_path);
507 }
508 }
509 }
510 /* Try to rmdir all directories under shm_path root. */
511 if (channel->root_shm_path[0]) {
512 (void) run_as_rmdir_recursive(channel->root_shm_path,
513 channel->buffer_credentials.value.uid,
514 channel->buffer_credentials.value.gid,
515 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
516 }
517 free(stream_fds);
518 error_alloc:
519 return ret;
520 }
521
522 /*
523 * Send a single given stream to the session daemon using the sock.
524 *
525 * Return 0 on success else a negative value.
526 */
527 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
528 {
529 int ret;
530
531 assert(stream);
532 assert(sock >= 0);
533
534 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
535
536 /* Send stream to session daemon. */
537 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
538 if (ret < 0) {
539 goto error;
540 }
541
542 error:
543 return ret;
544 }
545
546 /*
547 * Send channel to sessiond and relayd if applicable.
548 *
549 * Return 0 on success or else a negative value.
550 */
551 static int send_channel_to_sessiond_and_relayd(int sock,
552 struct lttng_consumer_channel *channel,
553 struct lttng_consumer_local_data *ctx, int *relayd_error)
554 {
555 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
556 struct lttng_consumer_stream *stream;
557 uint64_t net_seq_idx = -1ULL;
558
559 assert(channel);
560 assert(ctx);
561 assert(sock >= 0);
562
563 DBG("UST consumer sending channel %s to sessiond", channel->name);
564
565 if (channel->relayd_id != (uint64_t) -1ULL) {
566 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
567
568 health_code_update();
569
570 /* Try to send the stream to the relayd if one is available. */
571 DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd",
572 stream->key, channel->name);
573 ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
574 if (ret < 0) {
575 /*
576 * Flag that the relayd was the problem here probably due to a
577 * communicaton error on the socket.
578 */
579 if (relayd_error) {
580 *relayd_error = 1;
581 }
582 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
583 }
584 if (net_seq_idx == -1ULL) {
585 net_seq_idx = stream->net_seq_idx;
586 }
587 }
588 }
589
590 /* Inform sessiond that we are about to send channel and streams. */
591 ret = consumer_send_status_msg(sock, ret_code);
592 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
593 /*
594 * Either the session daemon is not responding or the relayd died so we
595 * stop now.
596 */
597 goto error;
598 }
599
600 /* Send channel to sessiond. */
601 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
602 if (ret < 0) {
603 goto error;
604 }
605
606 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
607 if (ret < 0) {
608 goto error;
609 }
610
611 /* The channel was sent successfully to the sessiond at this point. */
612 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
613
614 health_code_update();
615
616 /* Send stream to session daemon. */
617 ret = send_sessiond_stream(sock, stream);
618 if (ret < 0) {
619 goto error;
620 }
621 }
622
623 /* Tell sessiond there is no more stream. */
624 ret = ustctl_send_stream_to_sessiond(sock, NULL);
625 if (ret < 0) {
626 goto error;
627 }
628
629 DBG("UST consumer NULL stream sent to sessiond");
630
631 return 0;
632
633 error:
634 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
635 ret = -1;
636 }
637 return ret;
638 }
639
640 /*
641 * Creates a channel and streams and add the channel it to the channel internal
642 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
643 * received.
644 *
645 * Return 0 on success or else, a negative value is returned and the channel
646 * MUST be destroyed by consumer_del_channel().
647 */
648 static int ask_channel(struct lttng_consumer_local_data *ctx,
649 struct lttng_consumer_channel *channel,
650 struct ustctl_consumer_channel_attr *attr)
651 {
652 int ret;
653
654 assert(ctx);
655 assert(channel);
656 assert(attr);
657
658 /*
659 * This value is still used by the kernel consumer since for the kernel,
660 * the stream ownership is not IN the consumer so we need to have the
661 * number of left stream that needs to be initialized so we can know when
662 * to delete the channel (see consumer.c).
663 *
664 * As for the user space tracer now, the consumer creates and sends the
665 * stream to the session daemon which only sends them to the application
666 * once every stream of a channel is received making this value useless
667 * because we they will be added to the poll thread before the application
668 * receives them. This ensures that a stream can not hang up during
669 * initilization of a channel.
670 */
671 channel->nb_init_stream_left = 0;
672
673 /* The reply msg status is handled in the following call. */
674 ret = create_ust_channel(channel, attr, &channel->uchan);
675 if (ret < 0) {
676 goto end;
677 }
678
679 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
680
681 /*
682 * For the snapshots (no monitor), we create the metadata streams
683 * on demand, not during the channel creation.
684 */
685 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
686 ret = 0;
687 goto end;
688 }
689
690 /* Open all streams for this channel. */
691 pthread_mutex_lock(&channel->lock);
692 ret = create_ust_streams(channel, ctx);
693 pthread_mutex_unlock(&channel->lock);
694 if (ret < 0) {
695 goto end;
696 }
697
698 end:
699 return ret;
700 }
701
702 /*
703 * Send all stream of a channel to the right thread handling it.
704 *
705 * On error, return a negative value else 0 on success.
706 */
707 static int send_streams_to_thread(struct lttng_consumer_channel *channel,
708 struct lttng_consumer_local_data *ctx)
709 {
710 int ret = 0;
711 struct lttng_consumer_stream *stream, *stmp;
712
713 assert(channel);
714 assert(ctx);
715
716 /* Send streams to the corresponding thread. */
717 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
718 send_node) {
719
720 health_code_update();
721
722 /* Sending the stream to the thread. */
723 ret = send_stream_to_thread(stream, ctx);
724 if (ret < 0) {
725 /*
726 * If we are unable to send the stream to the thread, there is
727 * a big problem so just stop everything.
728 */
729 goto error;
730 }
731 }
732
733 error:
734 return ret;
735 }
736
737 /*
738 * Flush channel's streams using the given key to retrieve the channel.
739 *
740 * Return 0 on success else an LTTng error code.
741 */
742 static int flush_channel(uint64_t chan_key)
743 {
744 int ret = 0;
745 struct lttng_consumer_channel *channel;
746 struct lttng_consumer_stream *stream;
747 struct lttng_ht *ht;
748 struct lttng_ht_iter iter;
749
750 DBG("UST consumer flush channel key %" PRIu64, chan_key);
751
752 rcu_read_lock();
753 channel = consumer_find_channel(chan_key);
754 if (!channel) {
755 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
756 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
757 goto error;
758 }
759
760 ht = consumer_data.stream_per_chan_id_ht;
761
762 /* For each stream of the channel id, flush it. */
763 cds_lfht_for_each_entry_duplicate(ht->ht,
764 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
765 &channel->key, &iter.iter, stream, node_channel_id.node) {
766
767 health_code_update();
768
769 pthread_mutex_lock(&stream->lock);
770
771 /*
772 * Protect against concurrent teardown of a stream.
773 */
774 if (cds_lfht_is_node_deleted(&stream->node.node)) {
775 goto next;
776 }
777
778 if (!stream->quiescent) {
779 ustctl_flush_buffer(stream->ustream, 0);
780 stream->quiescent = true;
781 }
782 next:
783 pthread_mutex_unlock(&stream->lock);
784 }
785 error:
786 rcu_read_unlock();
787 return ret;
788 }
789
790 /*
791 * Clear quiescent state from channel's streams using the given key to
792 * retrieve the channel.
793 *
794 * Return 0 on success else an LTTng error code.
795 */
796 static int clear_quiescent_channel(uint64_t chan_key)
797 {
798 int ret = 0;
799 struct lttng_consumer_channel *channel;
800 struct lttng_consumer_stream *stream;
801 struct lttng_ht *ht;
802 struct lttng_ht_iter iter;
803
804 DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
805
806 rcu_read_lock();
807 channel = consumer_find_channel(chan_key);
808 if (!channel) {
809 ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
810 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
811 goto error;
812 }
813
814 ht = consumer_data.stream_per_chan_id_ht;
815
816 /* For each stream of the channel id, clear quiescent state. */
817 cds_lfht_for_each_entry_duplicate(ht->ht,
818 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
819 &channel->key, &iter.iter, stream, node_channel_id.node) {
820
821 health_code_update();
822
823 pthread_mutex_lock(&stream->lock);
824 stream->quiescent = false;
825 pthread_mutex_unlock(&stream->lock);
826 }
827 error:
828 rcu_read_unlock();
829 return ret;
830 }
831
832 /*
833 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
834 *
835 * Return 0 on success else an LTTng error code.
836 */
837 static int close_metadata(uint64_t chan_key)
838 {
839 int ret = 0;
840 struct lttng_consumer_channel *channel;
841 unsigned int channel_monitor;
842
843 DBG("UST consumer close metadata key %" PRIu64, chan_key);
844
845 channel = consumer_find_channel(chan_key);
846 if (!channel) {
847 /*
848 * This is possible if the metadata thread has issue a delete because
849 * the endpoint point of the stream hung up. There is no way the
850 * session daemon can know about it thus use a DBG instead of an actual
851 * error.
852 */
853 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
854 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
855 goto error;
856 }
857
858 pthread_mutex_lock(&consumer_data.lock);
859 pthread_mutex_lock(&channel->lock);
860 channel_monitor = channel->monitor;
861 if (cds_lfht_is_node_deleted(&channel->node.node)) {
862 goto error_unlock;
863 }
864
865 lttng_ustconsumer_close_metadata(channel);
866 pthread_mutex_unlock(&channel->lock);
867 pthread_mutex_unlock(&consumer_data.lock);
868
869 /*
870 * The ownership of a metadata channel depends on the type of
871 * session to which it belongs. In effect, the monitor flag is checked
872 * to determine if this metadata channel is in "snapshot" mode or not.
873 *
874 * In the non-snapshot case, the metadata channel is created along with
875 * a single stream which will remain present until the metadata channel
876 * is destroyed (on the destruction of its session). In this case, the
877 * metadata stream in "monitored" by the metadata poll thread and holds
878 * the ownership of its channel.
879 *
880 * Closing the metadata will cause the metadata stream's "metadata poll
881 * pipe" to be closed. Closing this pipe will wake-up the metadata poll
882 * thread which will teardown the metadata stream which, in return,
883 * deletes the metadata channel.
884 *
885 * In the snapshot case, the metadata stream is created and destroyed
886 * on every snapshot record. Since the channel doesn't have an owner
887 * other than the session daemon, it is safe to destroy it immediately
888 * on reception of the CLOSE_METADATA command.
889 */
890 if (!channel_monitor) {
891 /*
892 * The channel and consumer_data locks must be
893 * released before this call since consumer_del_channel
894 * re-acquires the channel and consumer_data locks to teardown
895 * the channel and queue its reclamation by the "call_rcu"
896 * worker thread.
897 */
898 consumer_del_channel(channel);
899 }
900
901 return ret;
902 error_unlock:
903 pthread_mutex_unlock(&channel->lock);
904 pthread_mutex_unlock(&consumer_data.lock);
905 error:
906 return ret;
907 }
908
909 /*
910 * RCU read side lock MUST be acquired before calling this function.
911 *
912 * Return 0 on success else an LTTng error code.
913 */
914 static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
915 {
916 int ret;
917 struct lttng_consumer_channel *metadata;
918
919 DBG("UST consumer setup metadata key %" PRIu64, key);
920
921 metadata = consumer_find_channel(key);
922 if (!metadata) {
923 ERR("UST consumer push metadata %" PRIu64 " not found", key);
924 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
925 goto end;
926 }
927
928 /*
929 * In no monitor mode, the metadata channel has no stream(s) so skip the
930 * ownership transfer to the metadata thread.
931 */
932 if (!metadata->monitor) {
933 DBG("Metadata channel in no monitor");
934 ret = 0;
935 goto end;
936 }
937
938 /*
939 * Send metadata stream to relayd if one available. Availability is
940 * known if the stream is still in the list of the channel.
941 */
942 if (cds_list_empty(&metadata->streams.head)) {
943 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
944 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
945 goto error_no_stream;
946 }
947
948 /* Send metadata stream to relayd if needed. */
949 if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
950 ret = consumer_send_relayd_stream(metadata->metadata_stream,
951 metadata->pathname);
952 if (ret < 0) {
953 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
954 goto error;
955 }
956 ret = consumer_send_relayd_streams_sent(
957 metadata->metadata_stream->net_seq_idx);
958 if (ret < 0) {
959 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
960 goto error;
961 }
962 }
963
964 /*
965 * Ownership of metadata stream is passed along. Freeing is handled by
966 * the callee.
967 */
968 ret = send_streams_to_thread(metadata, ctx);
969 if (ret < 0) {
970 /*
971 * If we are unable to send the stream to the thread, there is
972 * a big problem so just stop everything.
973 */
974 ret = LTTCOMM_CONSUMERD_FATAL;
975 goto send_streams_error;
976 }
977 /* List MUST be empty after or else it could be reused. */
978 assert(cds_list_empty(&metadata->streams.head));
979
980 ret = 0;
981 goto end;
982
983 error:
984 /*
985 * Delete metadata channel on error. At this point, the metadata stream can
986 * NOT be monitored by the metadata thread thus having the guarantee that
987 * the stream is still in the local stream list of the channel. This call
988 * will make sure to clean that list.
989 */
990 consumer_stream_destroy(metadata->metadata_stream, NULL);
991 cds_list_del(&metadata->metadata_stream->send_node);
992 metadata->metadata_stream = NULL;
993 send_streams_error:
994 error_no_stream:
995 end:
996 return ret;
997 }
998
999 /*
1000 * Snapshot the whole metadata.
1001 * RCU read-side lock must be held by the caller.
1002 *
1003 * Returns 0 on success, < 0 on error
1004 */
1005 static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
1006 uint64_t key, char *path, uint64_t relayd_id,
1007 struct lttng_consumer_local_data *ctx)
1008 {
1009 int ret = 0;
1010 struct lttng_consumer_stream *metadata_stream;
1011
1012 assert(path);
1013 assert(ctx);
1014
1015 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
1016 key, path);
1017
1018 rcu_read_lock();
1019
1020 assert(!metadata_channel->monitor);
1021
1022 health_code_update();
1023
1024 /*
1025 * Ask the sessiond if we have new metadata waiting and update the
1026 * consumer metadata cache.
1027 */
1028 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
1029 if (ret < 0) {
1030 goto error;
1031 }
1032
1033 health_code_update();
1034
1035 /*
1036 * The metadata stream is NOT created in no monitor mode when the channel
1037 * is created on a sessiond ask channel command.
1038 */
1039 ret = create_ust_streams(metadata_channel, ctx);
1040 if (ret < 0) {
1041 goto error;
1042 }
1043
1044 metadata_stream = metadata_channel->metadata_stream;
1045 assert(metadata_stream);
1046
1047 pthread_mutex_lock(&metadata_stream->lock);
1048 if (relayd_id != (uint64_t) -1ULL) {
1049 metadata_stream->net_seq_idx = relayd_id;
1050 ret = consumer_send_relayd_stream(metadata_stream, path);
1051 } else {
1052 ret = consumer_stream_create_output_files(metadata_stream,
1053 false);
1054 }
1055 pthread_mutex_unlock(&metadata_stream->lock);
1056 if (ret < 0) {
1057 goto error_stream;
1058 }
1059
1060 do {
1061 health_code_update();
1062
1063 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
1064 if (ret < 0) {
1065 goto error_stream;
1066 }
1067 } while (ret > 0);
1068
1069 error_stream:
1070 /*
1071 * Clean up the stream completly because the next snapshot will use a new
1072 * metadata stream.
1073 */
1074 consumer_stream_destroy(metadata_stream, NULL);
1075 cds_list_del(&metadata_stream->send_node);
1076 metadata_channel->metadata_stream = NULL;
1077
1078 error:
1079 rcu_read_unlock();
1080 return ret;
1081 }
1082
1083 static
1084 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
1085 const char **addr)
1086 {
1087 int ret;
1088 unsigned long mmap_offset;
1089 const char *mmap_base;
1090
1091 mmap_base = ustctl_get_mmap_base(stream->ustream);
1092 if (!mmap_base) {
1093 ERR("Failed to get mmap base for stream `%s`",
1094 stream->name);
1095 ret = -EPERM;
1096 goto error;
1097 }
1098
1099 ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
1100 if (ret != 0) {
1101 ERR("Failed to get mmap offset for stream `%s`", stream->name);
1102 ret = -EINVAL;
1103 goto error;
1104 }
1105
1106 *addr = mmap_base + mmap_offset;
1107 error:
1108 return ret;
1109
1110 }
1111
1112 /*
1113 * Take a snapshot of all the stream of a channel.
1114 * RCU read-side lock and the channel lock must be held by the caller.
1115 *
1116 * Returns 0 on success, < 0 on error
1117 */
1118 static int snapshot_channel(struct lttng_consumer_channel *channel,
1119 uint64_t key, char *path, uint64_t relayd_id,
1120 uint64_t nb_packets_per_stream,
1121 struct lttng_consumer_local_data *ctx)
1122 {
1123 int ret;
1124 unsigned use_relayd = 0;
1125 unsigned long consumed_pos, produced_pos;
1126 struct lttng_consumer_stream *stream;
1127
1128 assert(path);
1129 assert(ctx);
1130
1131 rcu_read_lock();
1132
1133 if (relayd_id != (uint64_t) -1ULL) {
1134 use_relayd = 1;
1135 }
1136
1137 assert(!channel->monitor);
1138 DBG("UST consumer snapshot channel %" PRIu64, key);
1139
1140 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
1141 health_code_update();
1142
1143 /* Lock stream because we are about to change its state. */
1144 pthread_mutex_lock(&stream->lock);
1145 assert(channel->trace_chunk);
1146 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
1147 /*
1148 * Can't happen barring an internal error as the channel
1149 * holds a reference to the trace chunk.
1150 */
1151 ERR("Failed to acquire reference to channel's trace chunk");
1152 ret = -1;
1153 goto error_unlock;
1154 }
1155 assert(!stream->trace_chunk);
1156 stream->trace_chunk = channel->trace_chunk;
1157
1158 stream->net_seq_idx = relayd_id;
1159
1160 if (use_relayd) {
1161 ret = consumer_send_relayd_stream(stream, path);
1162 if (ret < 0) {
1163 goto error_unlock;
1164 }
1165 } else {
1166 ret = consumer_stream_create_output_files(stream,
1167 false);
1168 if (ret < 0) {
1169 goto error_unlock;
1170 }
1171 DBG("UST consumer snapshot stream (%" PRIu64 ")",
1172 stream->key);
1173 }
1174
1175 /*
1176 * If tracing is active, we want to perform a "full" buffer flush.
1177 * Else, if quiescent, it has already been done by the prior stop.
1178 */
1179 if (!stream->quiescent) {
1180 ustctl_flush_buffer(stream->ustream, 0);
1181 }
1182
1183 ret = lttng_ustconsumer_take_snapshot(stream);
1184 if (ret < 0) {
1185 ERR("Taking UST snapshot");
1186 goto error_unlock;
1187 }
1188
1189 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
1190 if (ret < 0) {
1191 ERR("Produced UST snapshot position");
1192 goto error_unlock;
1193 }
1194
1195 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
1196 if (ret < 0) {
1197 ERR("Consumerd UST snapshot position");
1198 goto error_unlock;
1199 }
1200
1201 /*
1202 * The original value is sent back if max stream size is larger than
1203 * the possible size of the snapshot. Also, we assume that the session
1204 * daemon should never send a maximum stream size that is lower than
1205 * subbuffer size.
1206 */
1207 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
1208 produced_pos, nb_packets_per_stream,
1209 stream->max_sb_size);
1210
1211 while ((long) (consumed_pos - produced_pos) < 0) {
1212 ssize_t read_len;
1213 unsigned long len, padded_len;
1214 const char *subbuf_addr;
1215 struct lttng_buffer_view subbuf_view;
1216
1217 health_code_update();
1218
1219 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
1220
1221 ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
1222 if (ret < 0) {
1223 if (ret != -EAGAIN) {
1224 PERROR("ustctl_get_subbuf snapshot");
1225 goto error_close_stream;
1226 }
1227 DBG("UST consumer get subbuf failed. Skipping it.");
1228 consumed_pos += stream->max_sb_size;
1229 stream->chan->lost_packets++;
1230 continue;
1231 }
1232
1233 ret = ustctl_get_subbuf_size(stream->ustream, &len);
1234 if (ret < 0) {
1235 ERR("Snapshot ustctl_get_subbuf_size");
1236 goto error_put_subbuf;
1237 }
1238
1239 ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
1240 if (ret < 0) {
1241 ERR("Snapshot ustctl_get_padded_subbuf_size");
1242 goto error_put_subbuf;
1243 }
1244
1245 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1246 if (ret) {
1247 goto error_put_subbuf;
1248 }
1249
1250 subbuf_view = lttng_buffer_view_init(
1251 subbuf_addr, 0, padded_len);
1252 read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
1253 stream, &subbuf_view, padded_len - len,
1254 NULL);
1255 if (use_relayd) {
1256 if (read_len != len) {
1257 ret = -EPERM;
1258 goto error_put_subbuf;
1259 }
1260 } else {
1261 if (read_len != padded_len) {
1262 ret = -EPERM;
1263 goto error_put_subbuf;
1264 }
1265 }
1266
1267 ret = ustctl_put_subbuf(stream->ustream);
1268 if (ret < 0) {
1269 ERR("Snapshot ustctl_put_subbuf");
1270 goto error_close_stream;
1271 }
1272 consumed_pos += stream->max_sb_size;
1273 }
1274
1275 /* Simply close the stream so we can use it on the next snapshot. */
1276 consumer_stream_close(stream);
1277 pthread_mutex_unlock(&stream->lock);
1278 }
1279
1280 rcu_read_unlock();
1281 return 0;
1282
1283 error_put_subbuf:
1284 if (ustctl_put_subbuf(stream->ustream) < 0) {
1285 ERR("Snapshot ustctl_put_subbuf");
1286 }
1287 error_close_stream:
1288 consumer_stream_close(stream);
1289 error_unlock:
1290 pthread_mutex_unlock(&stream->lock);
1291 rcu_read_unlock();
1292 return ret;
1293 }
1294
1295 /*
1296 * Receive the metadata updates from the sessiond. Supports receiving
1297 * overlapping metadata, but is needs to always belong to a contiguous
1298 * range starting from 0.
1299 * Be careful about the locks held when calling this function: it needs
1300 * the metadata cache flush to concurrently progress in order to
1301 * complete.
1302 */
1303 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
1304 uint64_t len, uint64_t version,
1305 struct lttng_consumer_channel *channel, int timer, int wait)
1306 {
1307 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1308 char *metadata_str;
1309
1310 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
1311
1312 metadata_str = zmalloc(len * sizeof(char));
1313 if (!metadata_str) {
1314 PERROR("zmalloc metadata string");
1315 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1316 goto end;
1317 }
1318
1319 health_code_update();
1320
1321 /* Receive metadata string. */
1322 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1323 if (ret < 0) {
1324 /* Session daemon is dead so return gracefully. */
1325 ret_code = ret;
1326 goto end_free;
1327 }
1328
1329 health_code_update();
1330
1331 pthread_mutex_lock(&channel->metadata_cache->lock);
1332 ret = consumer_metadata_cache_write(channel, offset, len, version,
1333 metadata_str);
1334 if (ret < 0) {
1335 /* Unable to handle metadata. Notify session daemon. */
1336 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1337 /*
1338 * Skip metadata flush on write error since the offset and len might
1339 * not have been updated which could create an infinite loop below when
1340 * waiting for the metadata cache to be flushed.
1341 */
1342 pthread_mutex_unlock(&channel->metadata_cache->lock);
1343 goto end_free;
1344 }
1345 pthread_mutex_unlock(&channel->metadata_cache->lock);
1346
1347 if (!wait) {
1348 goto end_free;
1349 }
1350 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
1351 DBG("Waiting for metadata to be flushed");
1352
1353 health_code_update();
1354
1355 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1356 }
1357
1358 end_free:
1359 free(metadata_str);
1360 end:
1361 return ret_code;
1362 }
1363
1364 /*
1365 * Receive command from session daemon and process it.
1366 *
1367 * Return 1 on success else a negative value or 0.
1368 */
1369 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1370 int sock, struct pollfd *consumer_sockpoll)
1371 {
1372 ssize_t ret;
1373 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1374 struct lttcomm_consumer_msg msg;
1375 struct lttng_consumer_channel *channel = NULL;
1376
1377 health_code_update();
1378
1379 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1380 if (ret != sizeof(msg)) {
1381 DBG("Consumer received unexpected message size %zd (expects %zu)",
1382 ret, sizeof(msg));
1383 /*
1384 * The ret value might 0 meaning an orderly shutdown but this is ok
1385 * since the caller handles this.
1386 */
1387 if (ret > 0) {
1388 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
1389 ret = -1;
1390 }
1391 return ret;
1392 }
1393
1394 health_code_update();
1395
1396 /* deprecated */
1397 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
1398
1399 health_code_update();
1400
1401 /* relayd needs RCU read-side lock */
1402 rcu_read_lock();
1403
1404 switch (msg.cmd_type) {
1405 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1406 {
1407 /* Session daemon status message are handled in the following call. */
1408 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
1409 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
1410 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
1411 msg.u.relayd_sock.relayd_session_id);
1412 goto end_nosignal;
1413 }
1414 case LTTNG_CONSUMER_DESTROY_RELAYD:
1415 {
1416 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
1417 struct consumer_relayd_sock_pair *relayd;
1418
1419 DBG("UST consumer destroying relayd %" PRIu64, index);
1420
1421 /* Get relayd reference if exists. */
1422 relayd = consumer_find_relayd(index);
1423 if (relayd == NULL) {
1424 DBG("Unable to find relayd %" PRIu64, index);
1425 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
1426 }
1427
1428 /*
1429 * Each relayd socket pair has a refcount of stream attached to it
1430 * which tells if the relayd is still active or not depending on the
1431 * refcount value.
1432 *
1433 * This will set the destroy flag of the relayd object and destroy it
1434 * if the refcount reaches zero when called.
1435 *
1436 * The destroy can happen either here or when a stream fd hangs up.
1437 */
1438 if (relayd) {
1439 consumer_flag_relayd_for_destroy(relayd);
1440 }
1441
1442 goto end_msg_sessiond;
1443 }
1444 case LTTNG_CONSUMER_UPDATE_STREAM:
1445 {
1446 rcu_read_unlock();
1447 return -ENOSYS;
1448 }
1449 case LTTNG_CONSUMER_DATA_PENDING:
1450 {
1451 int ret, is_data_pending;
1452 uint64_t id = msg.u.data_pending.session_id;
1453
1454 DBG("UST consumer data pending command for id %" PRIu64, id);
1455
1456 is_data_pending = consumer_data_pending(id);
1457
1458 /* Send back returned value to session daemon */
1459 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
1460 sizeof(is_data_pending));
1461 if (ret < 0) {
1462 DBG("Error when sending the data pending ret code: %d", ret);
1463 goto error_fatal;
1464 }
1465
1466 /*
1467 * No need to send back a status message since the data pending
1468 * returned value is the response.
1469 */
1470 break;
1471 }
1472 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1473 {
1474 int ret;
1475 struct ustctl_consumer_channel_attr attr;
1476 const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value;
1477 const struct lttng_credentials buffer_credentials = {
1478 .uid = msg.u.ask_channel.buffer_credentials.uid,
1479 .gid = msg.u.ask_channel.buffer_credentials.gid,
1480 };
1481
1482 /* Create a plain object and reserve a channel key. */
1483 channel = allocate_channel(msg.u.ask_channel.session_id,
1484 msg.u.ask_channel.chunk_id.is_set ?
1485 &chunk_id : NULL,
1486 msg.u.ask_channel.pathname,
1487 msg.u.ask_channel.name,
1488 msg.u.ask_channel.relayd_id,
1489 msg.u.ask_channel.key,
1490 (enum lttng_event_output) msg.u.ask_channel.output,
1491 msg.u.ask_channel.tracefile_size,
1492 msg.u.ask_channel.tracefile_count,
1493 msg.u.ask_channel.session_id_per_pid,
1494 msg.u.ask_channel.monitor,
1495 msg.u.ask_channel.live_timer_interval,
1496 msg.u.ask_channel.root_shm_path,
1497 msg.u.ask_channel.shm_path);
1498 if (!channel) {
1499 goto end_channel_error;
1500 }
1501
1502 LTTNG_OPTIONAL_SET(&channel->buffer_credentials,
1503 buffer_credentials);
1504
1505 /*
1506 * Assign UST application UID to the channel. This value is ignored for
1507 * per PID buffers. This is specific to UST thus setting this after the
1508 * allocation.
1509 */
1510 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1511
1512 /* Build channel attributes from received message. */
1513 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1514 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1515 attr.overwrite = msg.u.ask_channel.overwrite;
1516 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1517 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
1518 attr.chan_id = msg.u.ask_channel.chan_id;
1519 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
1520 attr.blocking_timeout= msg.u.ask_channel.blocking_timeout;
1521
1522 /* Match channel buffer type to the UST abi. */
1523 switch (msg.u.ask_channel.output) {
1524 case LTTNG_EVENT_MMAP:
1525 default:
1526 attr.output = LTTNG_UST_MMAP;
1527 break;
1528 }
1529
1530 /* Translate and save channel type. */
1531 switch (msg.u.ask_channel.type) {
1532 case LTTNG_UST_CHAN_PER_CPU:
1533 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
1534 attr.type = LTTNG_UST_CHAN_PER_CPU;
1535 /*
1536 * Set refcount to 1 for owner. Below, we will
1537 * pass ownership to the
1538 * consumer_thread_channel_poll() thread.
1539 */
1540 channel->refcount = 1;
1541 break;
1542 case LTTNG_UST_CHAN_METADATA:
1543 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
1544 attr.type = LTTNG_UST_CHAN_METADATA;
1545 break;
1546 default:
1547 assert(0);
1548 goto error_fatal;
1549 };
1550
1551 health_code_update();
1552
1553 ret = ask_channel(ctx, channel, &attr);
1554 if (ret < 0) {
1555 goto end_channel_error;
1556 }
1557
1558 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1559 ret = consumer_metadata_cache_allocate(channel);
1560 if (ret < 0) {
1561 ERR("Allocating metadata cache");
1562 goto end_channel_error;
1563 }
1564 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1565 attr.switch_timer_interval = 0;
1566 } else {
1567 int monitor_start_ret;
1568
1569 consumer_timer_live_start(channel,
1570 msg.u.ask_channel.live_timer_interval);
1571 monitor_start_ret = consumer_timer_monitor_start(
1572 channel,
1573 msg.u.ask_channel.monitor_timer_interval);
1574 if (monitor_start_ret < 0) {
1575 ERR("Starting channel monitoring timer failed");
1576 goto end_channel_error;
1577 }
1578 }
1579
1580 health_code_update();
1581
1582 /*
1583 * Add the channel to the internal state AFTER all streams were created
1584 * and successfully sent to session daemon. This way, all streams must
1585 * be ready before this channel is visible to the threads.
1586 * If add_channel succeeds, ownership of the channel is
1587 * passed to consumer_thread_channel_poll().
1588 */
1589 ret = add_channel(channel, ctx);
1590 if (ret < 0) {
1591 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1592 if (channel->switch_timer_enabled == 1) {
1593 consumer_timer_switch_stop(channel);
1594 }
1595 consumer_metadata_cache_destroy(channel);
1596 }
1597 if (channel->live_timer_enabled == 1) {
1598 consumer_timer_live_stop(channel);
1599 }
1600 if (channel->monitor_timer_enabled == 1) {
1601 consumer_timer_monitor_stop(channel);
1602 }
1603 goto end_channel_error;
1604 }
1605
1606 health_code_update();
1607
1608 /*
1609 * Channel and streams are now created. Inform the session daemon that
1610 * everything went well and should wait to receive the channel and
1611 * streams with ustctl API.
1612 */
1613 ret = consumer_send_status_channel(sock, channel);
1614 if (ret < 0) {
1615 /*
1616 * There is probably a problem on the socket.
1617 */
1618 goto error_fatal;
1619 }
1620
1621 break;
1622 }
1623 case LTTNG_CONSUMER_GET_CHANNEL:
1624 {
1625 int ret, relayd_err = 0;
1626 uint64_t key = msg.u.get_channel.key;
1627 struct lttng_consumer_channel *channel;
1628
1629 channel = consumer_find_channel(key);
1630 if (!channel) {
1631 ERR("UST consumer get channel key %" PRIu64 " not found", key);
1632 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1633 goto end_get_channel;
1634 }
1635
1636 health_code_update();
1637
1638 /* Send the channel to sessiond (and relayd, if applicable). */
1639 ret = send_channel_to_sessiond_and_relayd(sock, channel, ctx,
1640 &relayd_err);
1641 if (ret < 0) {
1642 if (relayd_err) {
1643 /*
1644 * We were unable to send to the relayd the stream so avoid
1645 * sending back a fatal error to the thread since this is OK
1646 * and the consumer can continue its work. The above call
1647 * has sent the error status message to the sessiond.
1648 */
1649 goto end_get_channel_nosignal;
1650 }
1651 /*
1652 * The communicaton was broken hence there is a bad state between
1653 * the consumer and sessiond so stop everything.
1654 */
1655 goto error_get_channel_fatal;
1656 }
1657
1658 health_code_update();
1659
1660 /*
1661 * In no monitor mode, the streams ownership is kept inside the channel
1662 * so don't send them to the data thread.
1663 */
1664 if (!channel->monitor) {
1665 goto end_get_channel;
1666 }
1667
1668 ret = send_streams_to_thread(channel, ctx);
1669 if (ret < 0) {
1670 /*
1671 * If we are unable to send the stream to the thread, there is
1672 * a big problem so just stop everything.
1673 */
1674 goto error_get_channel_fatal;
1675 }
1676 /* List MUST be empty after or else it could be reused. */
1677 assert(cds_list_empty(&channel->streams.head));
1678 end_get_channel:
1679 goto end_msg_sessiond;
1680 error_get_channel_fatal:
1681 goto error_fatal;
1682 end_get_channel_nosignal:
1683 goto end_nosignal;
1684 }
1685 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1686 {
1687 uint64_t key = msg.u.destroy_channel.key;
1688
1689 /*
1690 * Only called if streams have not been sent to stream
1691 * manager thread. However, channel has been sent to
1692 * channel manager thread.
1693 */
1694 notify_thread_del_channel(ctx, key);
1695 goto end_msg_sessiond;
1696 }
1697 case LTTNG_CONSUMER_CLOSE_METADATA:
1698 {
1699 int ret;
1700
1701 ret = close_metadata(msg.u.close_metadata.key);
1702 if (ret != 0) {
1703 ret_code = ret;
1704 }
1705
1706 goto end_msg_sessiond;
1707 }
1708 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1709 {
1710 int ret;
1711
1712 ret = flush_channel(msg.u.flush_channel.key);
1713 if (ret != 0) {
1714 ret_code = ret;
1715 }
1716
1717 goto end_msg_sessiond;
1718 }
1719 case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
1720 {
1721 int ret;
1722
1723 ret = clear_quiescent_channel(
1724 msg.u.clear_quiescent_channel.key);
1725 if (ret != 0) {
1726 ret_code = ret;
1727 }
1728
1729 goto end_msg_sessiond;
1730 }
1731 case LTTNG_CONSUMER_PUSH_METADATA:
1732 {
1733 int ret;
1734 uint64_t len = msg.u.push_metadata.len;
1735 uint64_t key = msg.u.push_metadata.key;
1736 uint64_t offset = msg.u.push_metadata.target_offset;
1737 uint64_t version = msg.u.push_metadata.version;
1738 struct lttng_consumer_channel *channel;
1739
1740 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
1741 len);
1742
1743 channel = consumer_find_channel(key);
1744 if (!channel) {
1745 /*
1746 * This is possible if the metadata creation on the consumer side
1747 * is in flight vis-a-vis a concurrent push metadata from the
1748 * session daemon. Simply return that the channel failed and the
1749 * session daemon will handle that message correctly considering
1750 * that this race is acceptable thus the DBG() statement here.
1751 */
1752 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1753 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
1754 goto end_push_metadata_msg_sessiond;
1755 }
1756
1757 health_code_update();
1758
1759 if (!len) {
1760 /*
1761 * There is nothing to receive. We have simply
1762 * checked whether the channel can be found.
1763 */
1764 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1765 goto end_push_metadata_msg_sessiond;
1766 }
1767
1768 /* Tell session daemon we are ready to receive the metadata. */
1769 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
1770 if (ret < 0) {
1771 /* Somehow, the session daemon is not responding anymore. */
1772 goto error_push_metadata_fatal;
1773 }
1774
1775 health_code_update();
1776
1777 /* Wait for more data. */
1778 health_poll_entry();
1779 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1780 health_poll_exit();
1781 if (ret) {
1782 goto error_push_metadata_fatal;
1783 }
1784
1785 health_code_update();
1786
1787 ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
1788 len, version, channel, 0, 1);
1789 if (ret < 0) {
1790 /* error receiving from sessiond */
1791 goto error_push_metadata_fatal;
1792 } else {
1793 ret_code = ret;
1794 goto end_push_metadata_msg_sessiond;
1795 }
1796 end_push_metadata_msg_sessiond:
1797 goto end_msg_sessiond;
1798 error_push_metadata_fatal:
1799 goto error_fatal;
1800 }
1801 case LTTNG_CONSUMER_SETUP_METADATA:
1802 {
1803 int ret;
1804
1805 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1806 if (ret) {
1807 ret_code = ret;
1808 }
1809 goto end_msg_sessiond;
1810 }
1811 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1812 {
1813 struct lttng_consumer_channel *channel;
1814 uint64_t key = msg.u.snapshot_channel.key;
1815
1816 channel = consumer_find_channel(key);
1817 if (!channel) {
1818 DBG("UST snapshot channel not found for key %" PRIu64, key);
1819 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1820 } else {
1821 if (msg.u.snapshot_channel.metadata) {
1822 ret = snapshot_metadata(channel, key,
1823 msg.u.snapshot_channel.pathname,
1824 msg.u.snapshot_channel.relayd_id,
1825 ctx);
1826 if (ret < 0) {
1827 ERR("Snapshot metadata failed");
1828 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1829 }
1830 } else {
1831 ret = snapshot_channel(channel, key,
1832 msg.u.snapshot_channel.pathname,
1833 msg.u.snapshot_channel.relayd_id,
1834 msg.u.snapshot_channel.nb_packets_per_stream,
1835 ctx);
1836 if (ret < 0) {
1837 ERR("Snapshot channel failed");
1838 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1839 }
1840 }
1841 }
1842 health_code_update();
1843 ret = consumer_send_status_msg(sock, ret_code);
1844 if (ret < 0) {
1845 /* Somehow, the session daemon is not responding anymore. */
1846 goto end_nosignal;
1847 }
1848 health_code_update();
1849 break;
1850 }
1851 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1852 {
1853 int ret = 0;
1854 uint64_t discarded_events;
1855 struct lttng_ht_iter iter;
1856 struct lttng_ht *ht;
1857 struct lttng_consumer_stream *stream;
1858 uint64_t id = msg.u.discarded_events.session_id;
1859 uint64_t key = msg.u.discarded_events.channel_key;
1860
1861 DBG("UST consumer discarded events command for session id %"
1862 PRIu64, id);
1863 rcu_read_lock();
1864 pthread_mutex_lock(&consumer_data.lock);
1865
1866 ht = consumer_data.stream_list_ht;
1867
1868 /*
1869 * We only need a reference to the channel, but they are not
1870 * directly indexed, so we just use the first matching stream
1871 * to extract the information we need, we default to 0 if not
1872 * found (no events are dropped if the channel is not yet in
1873 * use).
1874 */
1875 discarded_events = 0;
1876 cds_lfht_for_each_entry_duplicate(ht->ht,
1877 ht->hash_fct(&id, lttng_ht_seed),
1878 ht->match_fct, &id,
1879 &iter.iter, stream, node_session_id.node) {
1880 if (stream->chan->key == key) {
1881 discarded_events = stream->chan->discarded_events;
1882 break;
1883 }
1884 }
1885 pthread_mutex_unlock(&consumer_data.lock);
1886 rcu_read_unlock();
1887
1888 DBG("UST consumer discarded events command for session id %"
1889 PRIu64 ", channel key %" PRIu64, id, key);
1890
1891 health_code_update();
1892
1893 /* Send back returned value to session daemon */
1894 ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
1895 if (ret < 0) {
1896 PERROR("send discarded events");
1897 goto error_fatal;
1898 }
1899
1900 break;
1901 }
1902 case LTTNG_CONSUMER_LOST_PACKETS:
1903 {
1904 int ret;
1905 uint64_t lost_packets;
1906 struct lttng_ht_iter iter;
1907 struct lttng_ht *ht;
1908 struct lttng_consumer_stream *stream;
1909 uint64_t id = msg.u.lost_packets.session_id;
1910 uint64_t key = msg.u.lost_packets.channel_key;
1911
1912 DBG("UST consumer lost packets command for session id %"
1913 PRIu64, id);
1914 rcu_read_lock();
1915 pthread_mutex_lock(&consumer_data.lock);
1916
1917 ht = consumer_data.stream_list_ht;
1918
1919 /*
1920 * We only need a reference to the channel, but they are not
1921 * directly indexed, so we just use the first matching stream
1922 * to extract the information we need, we default to 0 if not
1923 * found (no packets lost if the channel is not yet in use).
1924 */
1925 lost_packets = 0;
1926 cds_lfht_for_each_entry_duplicate(ht->ht,
1927 ht->hash_fct(&id, lttng_ht_seed),
1928 ht->match_fct, &id,
1929 &iter.iter, stream, node_session_id.node) {
1930 if (stream->chan->key == key) {
1931 lost_packets = stream->chan->lost_packets;
1932 break;
1933 }
1934 }
1935 pthread_mutex_unlock(&consumer_data.lock);
1936 rcu_read_unlock();
1937
1938 DBG("UST consumer lost packets command for session id %"
1939 PRIu64 ", channel key %" PRIu64, id, key);
1940
1941 health_code_update();
1942
1943 /* Send back returned value to session daemon */
1944 ret = lttcomm_send_unix_sock(sock, &lost_packets,
1945 sizeof(lost_packets));
1946 if (ret < 0) {
1947 PERROR("send lost packets");
1948 goto error_fatal;
1949 }
1950
1951 break;
1952 }
1953 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1954 {
1955 int channel_monitor_pipe;
1956
1957 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1958 /* Successfully received the command's type. */
1959 ret = consumer_send_status_msg(sock, ret_code);
1960 if (ret < 0) {
1961 goto error_fatal;
1962 }
1963
1964 ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
1965 1);
1966 if (ret != sizeof(channel_monitor_pipe)) {
1967 ERR("Failed to receive channel monitor pipe");
1968 goto error_fatal;
1969 }
1970
1971 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1972 ret = consumer_timer_thread_set_channel_monitor_pipe(
1973 channel_monitor_pipe);
1974 if (!ret) {
1975 int flags;
1976
1977 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1978 /* Set the pipe as non-blocking. */
1979 ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
1980 if (ret == -1) {
1981 PERROR("fcntl get flags of the channel monitoring pipe");
1982 goto error_fatal;
1983 }
1984 flags = ret;
1985
1986 ret = fcntl(channel_monitor_pipe, F_SETFL,
1987 flags | O_NONBLOCK);
1988 if (ret == -1) {
1989 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1990 goto error_fatal;
1991 }
1992 DBG("Channel monitor pipe set as non-blocking");
1993 } else {
1994 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1995 }
1996 goto end_msg_sessiond;
1997 }
1998 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1999 {
2000 struct lttng_consumer_channel *channel;
2001 uint64_t key = msg.u.rotate_channel.key;
2002
2003 channel = consumer_find_channel(key);
2004 if (!channel) {
2005 DBG("Channel %" PRIu64 " not found", key);
2006 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2007 } else {
2008 /*
2009 * Sample the rotate position of all the streams in
2010 * this channel.
2011 */
2012 ret = lttng_consumer_rotate_channel(channel, key,
2013 msg.u.rotate_channel.relayd_id,
2014 msg.u.rotate_channel.metadata,
2015 ctx);
2016 if (ret < 0) {
2017 ERR("Rotate channel failed");
2018 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
2019 }
2020
2021 health_code_update();
2022 }
2023 ret = consumer_send_status_msg(sock, ret_code);
2024 if (ret < 0) {
2025 /* Somehow, the session daemon is not responding anymore. */
2026 goto end_rotate_channel_nosignal;
2027 }
2028
2029 /*
2030 * Rotate the streams that are ready right now.
2031 * FIXME: this is a second consecutive iteration over the
2032 * streams in a channel, there is probably a better way to
2033 * handle this, but it needs to be after the
2034 * consumer_send_status_msg() call.
2035 */
2036 if (channel) {
2037 ret = lttng_consumer_rotate_ready_streams(
2038 channel, key, ctx);
2039 if (ret < 0) {
2040 ERR("Rotate channel failed");
2041 }
2042 }
2043 break;
2044 end_rotate_channel_nosignal:
2045 goto end_nosignal;
2046 }
2047 case LTTNG_CONSUMER_CLEAR_CHANNEL:
2048 {
2049 struct lttng_consumer_channel *channel;
2050 uint64_t key = msg.u.clear_channel.key;
2051
2052 channel = consumer_find_channel(key);
2053 if (!channel) {
2054 DBG("Channel %" PRIu64 " not found", key);
2055 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2056 } else {
2057 ret = lttng_consumer_clear_channel(channel);
2058 if (ret) {
2059 ERR("Clear channel failed key %" PRIu64, key);
2060 ret_code = ret;
2061 }
2062
2063 health_code_update();
2064 }
2065 ret = consumer_send_status_msg(sock, ret_code);
2066 if (ret < 0) {
2067 /* Somehow, the session daemon is not responding anymore. */
2068 goto end_nosignal;
2069 }
2070 break;
2071 }
2072 case LTTNG_CONSUMER_INIT:
2073 {
2074 ret_code = lttng_consumer_init_command(ctx,
2075 msg.u.init.sessiond_uuid);
2076 health_code_update();
2077 ret = consumer_send_status_msg(sock, ret_code);
2078 if (ret < 0) {
2079 /* Somehow, the session daemon is not responding anymore. */
2080 goto end_nosignal;
2081 }
2082 break;
2083 }
2084 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
2085 {
2086 const struct lttng_credentials credentials = {
2087 .uid = msg.u.create_trace_chunk.credentials.value.uid,
2088 .gid = msg.u.create_trace_chunk.credentials.value.gid,
2089 };
2090 const bool is_local_trace =
2091 !msg.u.create_trace_chunk.relayd_id.is_set;
2092 const uint64_t relayd_id =
2093 msg.u.create_trace_chunk.relayd_id.value;
2094 const char *chunk_override_name =
2095 *msg.u.create_trace_chunk.override_name ?
2096 msg.u.create_trace_chunk.override_name :
2097 NULL;
2098 struct lttng_directory_handle *chunk_directory_handle = NULL;
2099
2100 /*
2101 * The session daemon will only provide a chunk directory file
2102 * descriptor for local traces.
2103 */
2104 if (is_local_trace) {
2105 int chunk_dirfd;
2106
2107 /* Acnowledge the reception of the command. */
2108 ret = consumer_send_status_msg(sock,
2109 LTTCOMM_CONSUMERD_SUCCESS);
2110 if (ret < 0) {
2111 /* Somehow, the session daemon is not responding anymore. */
2112 goto end_nosignal;
2113 }
2114
2115 /*
2116 * Receive trace chunk domain dirfd.
2117 */
2118 ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
2119 if (ret != sizeof(chunk_dirfd)) {
2120 ERR("Failed to receive trace chunk domain directory file descriptor");
2121 goto error_fatal;
2122 }
2123
2124 DBG("Received trace chunk domain directory fd (%d)",
2125 chunk_dirfd);
2126 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
2127 chunk_dirfd);
2128 if (!chunk_directory_handle) {
2129 ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
2130 if (close(chunk_dirfd)) {
2131 PERROR("Failed to close chunk directory file descriptor");
2132 }
2133 goto error_fatal;
2134 }
2135 }
2136
2137 ret_code = lttng_consumer_create_trace_chunk(
2138 !is_local_trace ? &relayd_id : NULL,
2139 msg.u.create_trace_chunk.session_id,
2140 msg.u.create_trace_chunk.chunk_id,
2141 (time_t) msg.u.create_trace_chunk
2142 .creation_timestamp,
2143 chunk_override_name,
2144 msg.u.create_trace_chunk.credentials.is_set ?
2145 &credentials :
2146 NULL,
2147 chunk_directory_handle);
2148 lttng_directory_handle_put(chunk_directory_handle);
2149 goto end_msg_sessiond;
2150 }
2151 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
2152 {
2153 enum lttng_trace_chunk_command_type close_command =
2154 msg.u.close_trace_chunk.close_command.value;
2155 const uint64_t relayd_id =
2156 msg.u.close_trace_chunk.relayd_id.value;
2157 struct lttcomm_consumer_close_trace_chunk_reply reply;
2158 char closed_trace_chunk_path[LTTNG_PATH_MAX];
2159 int ret;
2160
2161 ret_code = lttng_consumer_close_trace_chunk(
2162 msg.u.close_trace_chunk.relayd_id.is_set ?
2163 &relayd_id :
2164 NULL,
2165 msg.u.close_trace_chunk.session_id,
2166 msg.u.close_trace_chunk.chunk_id,
2167 (time_t) msg.u.close_trace_chunk.close_timestamp,
2168 msg.u.close_trace_chunk.close_command.is_set ?
2169 &close_command :
2170 NULL, closed_trace_chunk_path);
2171 reply.ret_code = ret_code;
2172 reply.path_length = strlen(closed_trace_chunk_path) + 1;
2173 ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
2174 if (ret != sizeof(reply)) {
2175 goto error_fatal;
2176 }
2177 ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
2178 reply.path_length);
2179 if (ret != reply.path_length) {
2180 goto error_fatal;
2181 }
2182 goto end_nosignal;
2183 }
2184 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
2185 {
2186 const uint64_t relayd_id =
2187 msg.u.trace_chunk_exists.relayd_id.value;
2188
2189 ret_code = lttng_consumer_trace_chunk_exists(
2190 msg.u.trace_chunk_exists.relayd_id.is_set ?
2191 &relayd_id : NULL,
2192 msg.u.trace_chunk_exists.session_id,
2193 msg.u.trace_chunk_exists.chunk_id);
2194 goto end_msg_sessiond;
2195 }
2196 default:
2197 break;
2198 }
2199
2200 end_nosignal:
2201 /*
2202 * Return 1 to indicate success since the 0 value can be a socket
2203 * shutdown during the recv() or send() call.
2204 */
2205 ret = 1;
2206 goto end;
2207
2208 end_msg_sessiond:
2209 /*
2210 * The returned value here is not useful since either way we'll return 1 to
2211 * the caller because the session daemon socket management is done
2212 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
2213 */
2214 ret = consumer_send_status_msg(sock, ret_code);
2215 if (ret < 0) {
2216 goto error_fatal;
2217 }
2218 ret = 1;
2219 goto end;
2220
2221 end_channel_error:
2222 if (channel) {
2223 /*
2224 * Free channel here since no one has a reference to it. We don't
2225 * free after that because a stream can store this pointer.
2226 */
2227 destroy_channel(channel);
2228 }
2229 /* We have to send a status channel message indicating an error. */
2230 ret = consumer_send_status_channel(sock, NULL);
2231 if (ret < 0) {
2232 /* Stop everything if session daemon can not be notified. */
2233 goto error_fatal;
2234 }
2235 ret = 1;
2236 goto end;
2237
2238 error_fatal:
2239 /* This will issue a consumer stop. */
2240 ret = -1;
2241 goto end;
2242
2243 end:
2244 rcu_read_unlock();
2245 health_code_update();
2246 return ret;
2247 }
2248
2249 void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
2250 int producer_active)
2251 {
2252 assert(stream);
2253 assert(stream->ustream);
2254
2255 ustctl_flush_buffer(stream->ustream, producer_active);
2256 }
2257
2258 /*
2259 * Take a snapshot for a specific stream.
2260 *
2261 * Returns 0 on success, < 0 on error
2262 */
2263 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
2264 {
2265 assert(stream);
2266 assert(stream->ustream);
2267
2268 return ustctl_snapshot(stream->ustream);
2269 }
2270
2271 /*
2272 * Sample consumed and produced positions for a specific stream.
2273 *
2274 * Returns 0 on success, < 0 on error.
2275 */
2276 int lttng_ustconsumer_sample_snapshot_positions(
2277 struct lttng_consumer_stream *stream)
2278 {
2279 assert(stream);
2280 assert(stream->ustream);
2281
2282 return ustctl_snapshot_sample_positions(stream->ustream);
2283 }
2284
2285 /*
2286 * Get the produced position
2287 *
2288 * Returns 0 on success, < 0 on error
2289 */
2290 int lttng_ustconsumer_get_produced_snapshot(
2291 struct lttng_consumer_stream *stream, unsigned long *pos)
2292 {
2293 assert(stream);
2294 assert(stream->ustream);
2295 assert(pos);
2296
2297 return ustctl_snapshot_get_produced(stream->ustream, pos);
2298 }
2299
2300 /*
2301 * Get the consumed position
2302 *
2303 * Returns 0 on success, < 0 on error
2304 */
2305 int lttng_ustconsumer_get_consumed_snapshot(
2306 struct lttng_consumer_stream *stream, unsigned long *pos)
2307 {
2308 assert(stream);
2309 assert(stream->ustream);
2310 assert(pos);
2311
2312 return ustctl_snapshot_get_consumed(stream->ustream, pos);
2313 }
2314
2315 void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
2316 int producer)
2317 {
2318 assert(stream);
2319 assert(stream->ustream);
2320
2321 ustctl_flush_buffer(stream->ustream, producer);
2322 }
2323
2324 void lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
2325 {
2326 assert(stream);
2327 assert(stream->ustream);
2328
2329 ustctl_clear_buffer(stream->ustream);
2330 }
2331
2332 int lttng_ustconsumer_get_current_timestamp(
2333 struct lttng_consumer_stream *stream, uint64_t *ts)
2334 {
2335 assert(stream);
2336 assert(stream->ustream);
2337 assert(ts);
2338
2339 return ustctl_get_current_timestamp(stream->ustream, ts);
2340 }
2341
2342 int lttng_ustconsumer_get_sequence_number(
2343 struct lttng_consumer_stream *stream, uint64_t *seq)
2344 {
2345 assert(stream);
2346 assert(stream->ustream);
2347 assert(seq);
2348
2349 return ustctl_get_sequence_number(stream->ustream, seq);
2350 }
2351
2352 /*
2353 * Called when the stream signals the consumer that it has hung up.
2354 */
2355 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
2356 {
2357 assert(stream);
2358 assert(stream->ustream);
2359
2360 pthread_mutex_lock(&stream->lock);
2361 if (!stream->quiescent) {
2362 ustctl_flush_buffer(stream->ustream, 0);
2363 stream->quiescent = true;
2364 }
2365 pthread_mutex_unlock(&stream->lock);
2366 stream->hangup_flush_done = 1;
2367 }
2368
2369 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
2370 {
2371 int i;
2372
2373 assert(chan);
2374 assert(chan->uchan);
2375 assert(chan->buffer_credentials.is_set);
2376
2377 if (chan->switch_timer_enabled == 1) {
2378 consumer_timer_switch_stop(chan);
2379 }
2380 for (i = 0; i < chan->nr_stream_fds; i++) {
2381 int ret;
2382
2383 ret = close(chan->stream_fds[i]);
2384 if (ret) {
2385 PERROR("close");
2386 }
2387 if (chan->shm_path[0]) {
2388 char shm_path[PATH_MAX];
2389
2390 ret = get_stream_shm_path(shm_path, chan->shm_path, i);
2391 if (ret) {
2392 ERR("Cannot get stream shm path");
2393 }
2394 ret = run_as_unlink(shm_path,
2395 chan->buffer_credentials.value.uid,
2396 chan->buffer_credentials.value.gid);
2397 if (ret) {
2398 PERROR("unlink %s", shm_path);
2399 }
2400 }
2401 }
2402 }
2403
2404 void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
2405 {
2406 assert(chan);
2407 assert(chan->uchan);
2408 assert(chan->buffer_credentials.is_set);
2409
2410 consumer_metadata_cache_destroy(chan);
2411 ustctl_destroy_channel(chan->uchan);
2412 /* Try to rmdir all directories under shm_path root. */
2413 if (chan->root_shm_path[0]) {
2414 (void) run_as_rmdir_recursive(chan->root_shm_path,
2415 chan->buffer_credentials.value.uid,
2416 chan->buffer_credentials.value.gid,
2417 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
2418 }
2419 free(chan->stream_fds);
2420 }
2421
2422 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
2423 {
2424 assert(stream);
2425 assert(stream->ustream);
2426
2427 if (stream->chan->switch_timer_enabled == 1) {
2428 consumer_timer_switch_stop(stream->chan);
2429 }
2430 ustctl_destroy_stream(stream->ustream);
2431 }
2432
2433 int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
2434 {
2435 assert(stream);
2436 assert(stream->ustream);
2437
2438 return ustctl_stream_get_wakeup_fd(stream->ustream);
2439 }
2440
2441 int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
2442 {
2443 assert(stream);
2444 assert(stream->ustream);
2445
2446 return ustctl_stream_close_wakeup_fd(stream->ustream);
2447 }
2448
2449 /*
2450 * Populate index values of a UST stream. Values are set in big endian order.
2451 *
2452 * Return 0 on success or else a negative value.
2453 */
2454 static int get_index_values(struct ctf_packet_index *index,
2455 struct ustctl_consumer_stream *ustream)
2456 {
2457 int ret;
2458 uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
2459 events_discarded, stream_id, stream_instance_id,
2460 packet_seq_num;
2461
2462 ret = ustctl_get_timestamp_begin(ustream, &timestamp_begin);
2463 if (ret < 0) {
2464 PERROR("ustctl_get_timestamp_begin");
2465 goto error;
2466 }
2467
2468 ret = ustctl_get_timestamp_end(ustream, &timestamp_end);
2469 if (ret < 0) {
2470 PERROR("ustctl_get_timestamp_end");
2471 goto error;
2472 }
2473
2474 ret = ustctl_get_events_discarded(ustream, &events_discarded);
2475 if (ret < 0) {
2476 PERROR("ustctl_get_events_discarded");
2477 goto error;
2478 }
2479
2480 ret = ustctl_get_content_size(ustream, &content_size);
2481 if (ret < 0) {
2482 PERROR("ustctl_get_content_size");
2483 goto error;
2484 }
2485
2486 ret = ustctl_get_packet_size(ustream, &packet_size);
2487 if (ret < 0) {
2488 PERROR("ustctl_get_packet_size");
2489 goto error;
2490 }
2491
2492 ret = ustctl_get_stream_id(ustream, &stream_id);
2493 if (ret < 0) {
2494 PERROR("ustctl_get_stream_id");
2495 goto error;
2496 }
2497
2498 ret = ustctl_get_instance_id(ustream, &stream_instance_id);
2499 if (ret < 0) {
2500 PERROR("ustctl_get_instance_id");
2501 goto error;
2502 }
2503
2504 ret = ustctl_get_sequence_number(ustream, &packet_seq_num);
2505 if (ret < 0) {
2506 PERROR("ustctl_get_sequence_number");
2507 goto error;
2508 }
2509
2510 *index = (typeof(*index)) {
2511 .offset = index->offset,
2512 .packet_size = htobe64(packet_size),
2513 .content_size = htobe64(content_size),
2514 .timestamp_begin = htobe64(timestamp_begin),
2515 .timestamp_end = htobe64(timestamp_end),
2516 .events_discarded = htobe64(events_discarded),
2517 .stream_id = htobe64(stream_id),
2518 .stream_instance_id = htobe64(stream_instance_id),
2519 .packet_seq_num = htobe64(packet_seq_num),
2520 };
2521
2522 error:
2523 return ret;
2524 }
2525
2526 static
2527 void metadata_stream_reset_cache(struct lttng_consumer_stream *stream,
2528 struct consumer_metadata_cache *cache)
2529 {
2530 DBG("Metadata stream update to version %" PRIu64,
2531 cache->version);
2532 stream->ust_metadata_pushed = 0;
2533 stream->metadata_version = cache->version;
2534 stream->reset_metadata_flag = 1;
2535 }
2536
2537 /*
2538 * Check if the version of the metadata stream and metadata cache match.
2539 * If the cache got updated, reset the metadata stream.
2540 * The stream lock and metadata cache lock MUST be held.
2541 * Return 0 on success, a negative value on error.
2542 */
2543 static
2544 int metadata_stream_check_version(struct lttng_consumer_stream *stream)
2545 {
2546 int ret = 0;
2547 struct consumer_metadata_cache *cache = stream->chan->metadata_cache;
2548
2549 if (cache->version == stream->metadata_version) {
2550 goto end;
2551 }
2552 metadata_stream_reset_cache(stream, cache);
2553
2554 end:
2555 return ret;
2556 }
2557
2558 /*
2559 * Write up to one packet from the metadata cache to the channel.
2560 *
2561 * Returns the number of bytes pushed in the cache, or a negative value
2562 * on error.
2563 */
2564 static
2565 int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
2566 {
2567 ssize_t write_len;
2568 int ret;
2569
2570 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
2571 ret = metadata_stream_check_version(stream);
2572 if (ret < 0) {
2573 goto end;
2574 }
2575 if (stream->chan->metadata_cache->max_offset
2576 == stream->ust_metadata_pushed) {
2577 ret = 0;
2578 goto end;
2579 }
2580
2581 write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
2582 &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
2583 stream->chan->metadata_cache->max_offset
2584 - stream->ust_metadata_pushed);
2585 assert(write_len != 0);
2586 if (write_len < 0) {
2587 ERR("Writing one metadata packet");
2588 ret = -1;
2589 goto end;
2590 }
2591 stream->ust_metadata_pushed += write_len;
2592
2593 assert(stream->chan->metadata_cache->max_offset >=
2594 stream->ust_metadata_pushed);
2595 ret = write_len;
2596
2597 /*
2598 * Switch packet (but don't open the next one) on every commit of
2599 * a metadata packet. Since the subbuffer is fully filled (with padding,
2600 * if needed), the stream is "quiescent" after this commit.
2601 */
2602 ustctl_flush_buffer(stream->ustream, 1);
2603 stream->quiescent = true;
2604 end:
2605 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
2606 return ret;
2607 }
2608
2609
2610 /*
2611 * Sync metadata meaning request them to the session daemon and snapshot to the
2612 * metadata thread can consumer them.
2613 *
2614 * Metadata stream lock is held here, but we need to release it when
2615 * interacting with sessiond, else we cause a deadlock with live
2616 * awaiting on metadata to be pushed out.
2617 *
2618 * The RCU read side lock must be held by the caller.
2619 *
2620 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
2621 * is empty or a negative value on error.
2622 */
2623 int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
2624 struct lttng_consumer_stream *metadata_stream)
2625 {
2626 int ret;
2627 int retry = 0;
2628 struct lttng_consumer_channel *metadata_channel;
2629
2630 assert(ctx);
2631 assert(metadata_stream);
2632
2633 metadata_channel = metadata_stream->chan;
2634 pthread_mutex_unlock(&metadata_stream->lock);
2635 /*
2636 * Request metadata from the sessiond, but don't wait for the flush
2637 * because we locked the metadata thread.
2638 */
2639 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
2640 pthread_mutex_lock(&metadata_stream->lock);
2641 if (ret < 0) {
2642 goto end;
2643 }
2644
2645 /*
2646 * The metadata stream and channel can be deleted while the
2647 * metadata stream lock was released. The streamed is checked
2648 * for deletion before we use it further.
2649 *
2650 * Note that it is safe to access a logically-deleted stream since its
2651 * existence is still guaranteed by the RCU read side lock. However,
2652 * it should no longer be used. The close/deletion of the metadata
2653 * channel and stream already guarantees that all metadata has been
2654 * consumed. Therefore, there is nothing left to do in this function.
2655 */
2656 if (consumer_stream_is_deleted(metadata_stream)) {
2657 DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
2658 metadata_stream->key);
2659 ret = 0;
2660 goto end;
2661 }
2662
2663 ret = commit_one_metadata_packet(metadata_stream);
2664 if (ret <= 0) {
2665 goto end;
2666 } else if (ret > 0) {
2667 retry = 1;
2668 }
2669
2670 ret = ustctl_snapshot(metadata_stream->ustream);
2671 if (ret < 0) {
2672 if (errno != EAGAIN) {
2673 ERR("Sync metadata, taking UST snapshot");
2674 goto end;
2675 }
2676 DBG("No new metadata when syncing them.");
2677 /* No new metadata, exit. */
2678 ret = ENODATA;
2679 goto end;
2680 }
2681
2682 /*
2683 * After this flush, we still need to extract metadata.
2684 */
2685 if (retry) {
2686 ret = EAGAIN;
2687 }
2688
2689 end:
2690 return ret;
2691 }
2692
2693 /*
2694 * Return 0 on success else a negative value.
2695 */
2696 static int notify_if_more_data(struct lttng_consumer_stream *stream,
2697 struct lttng_consumer_local_data *ctx)
2698 {
2699 int ret;
2700 struct ustctl_consumer_stream *ustream;
2701
2702 assert(stream);
2703 assert(ctx);
2704
2705 ustream = stream->ustream;
2706
2707 /*
2708 * First, we are going to check if there is a new subbuffer available
2709 * before reading the stream wait_fd.
2710 */
2711 /* Get the next subbuffer */
2712 ret = ustctl_get_next_subbuf(ustream);
2713 if (ret) {
2714 /* No more data found, flag the stream. */
2715 stream->has_data = 0;
2716 ret = 0;
2717 goto end;
2718 }
2719
2720 ret = ustctl_put_subbuf(ustream);
2721 assert(!ret);
2722
2723 /* This stream still has data. Flag it and wake up the data thread. */
2724 stream->has_data = 1;
2725
2726 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
2727 ssize_t writelen;
2728
2729 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
2730 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2731 ret = writelen;
2732 goto end;
2733 }
2734
2735 /* The wake up pipe has been notified. */
2736 ctx->has_wakeup = 1;
2737 }
2738 ret = 0;
2739
2740 end:
2741 return ret;
2742 }
2743
2744 static
2745 int update_stream_stats(struct lttng_consumer_stream *stream)
2746 {
2747 int ret;
2748 uint64_t seq, discarded;
2749
2750 ret = ustctl_get_sequence_number(stream->ustream, &seq);
2751 if (ret < 0) {
2752 PERROR("ustctl_get_sequence_number");
2753 goto end;
2754 }
2755 /*
2756 * Start the sequence when we extract the first packet in case we don't
2757 * start at 0 (for example if a consumer is not connected to the
2758 * session immediately after the beginning).
2759 */
2760 if (stream->last_sequence_number == -1ULL) {
2761 stream->last_sequence_number = seq;
2762 } else if (seq > stream->last_sequence_number) {
2763 stream->chan->lost_packets += seq -
2764 stream->last_sequence_number - 1;
2765 } else {
2766 /* seq <= last_sequence_number */
2767 ERR("Sequence number inconsistent : prev = %" PRIu64
2768 ", current = %" PRIu64,
2769 stream->last_sequence_number, seq);
2770 ret = -1;
2771 goto end;
2772 }
2773 stream->last_sequence_number = seq;
2774
2775 ret = ustctl_get_events_discarded(stream->ustream, &discarded);
2776 if (ret < 0) {
2777 PERROR("kernctl_get_events_discarded");
2778 goto end;
2779 }
2780 if (discarded < stream->last_discarded_events) {
2781 /*
2782 * Overflow has occurred. We assume only one wrap-around
2783 * has occurred.
2784 */
2785 stream->chan->discarded_events +=
2786 (1ULL << (CAA_BITS_PER_LONG - 1)) -
2787 stream->last_discarded_events + discarded;
2788 } else {
2789 stream->chan->discarded_events += discarded -
2790 stream->last_discarded_events;
2791 }
2792 stream->last_discarded_events = discarded;
2793 ret = 0;
2794
2795 end:
2796 return ret;
2797 }
2798
2799 /*
2800 * Read subbuffer from the given stream.
2801 *
2802 * Stream and channel locks MUST be acquired by the caller.
2803 *
2804 * Return 0 on success else a negative value.
2805 */
2806 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
2807 struct lttng_consumer_local_data *ctx)
2808 {
2809 unsigned long len, subbuf_size, padding;
2810 int err, write_index = 1, rotation_ret;
2811 long ret = 0;
2812 struct ustctl_consumer_stream *ustream;
2813 struct ctf_packet_index index;
2814 const char *subbuf_addr;
2815 struct lttng_buffer_view subbuf_view;
2816
2817 assert(stream);
2818 assert(stream->ustream);
2819 assert(ctx);
2820
2821 DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
2822 stream->name);
2823
2824 /* Ease our life for what's next. */
2825 ustream = stream->ustream;
2826
2827 /*
2828 * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
2829 * error if we cannot read this one byte (read returns 0), or if the error
2830 * is EAGAIN or EWOULDBLOCK.
2831 *
2832 * This is only done when the stream is monitored by a thread, before the
2833 * flush is done after a hangup and if the stream is not flagged with data
2834 * since there might be nothing to consume in the wait fd but still have
2835 * data available flagged by the consumer wake up pipe.
2836 */
2837 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
2838 char dummy;
2839 ssize_t readlen;
2840
2841 readlen = lttng_read(stream->wait_fd, &dummy, 1);
2842 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2843 ret = readlen;
2844 goto error;
2845 }
2846 }
2847
2848 /*
2849 * If the stream was flagged to be ready for rotation before we extract the
2850 * next packet, rotate it now.
2851 */
2852 if (stream->rotate_ready) {
2853 DBG("Rotate stream before extracting data");
2854 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
2855 if (rotation_ret < 0) {
2856 ERR("Stream rotation error");
2857 ret = -1;
2858 goto error;
2859 }
2860 }
2861
2862 retry:
2863 /* Get the next subbuffer */
2864 err = ustctl_get_next_subbuf(ustream);
2865 if (err != 0) {
2866 /*
2867 * Populate metadata info if the existing info has
2868 * already been read.
2869 */
2870 if (stream->metadata_flag) {
2871 ret = commit_one_metadata_packet(stream);
2872 if (ret <= 0) {
2873 goto error;
2874 }
2875 goto retry;
2876 }
2877
2878 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
2879 /*
2880 * This is a debug message even for single-threaded consumer,
2881 * because poll() have more relaxed criterions than get subbuf,
2882 * so get_subbuf may fail for short race windows where poll()
2883 * would issue wakeups.
2884 */
2885 DBG("Reserving sub buffer failed (everything is normal, "
2886 "it is due to concurrency) [ret: %d]", err);
2887 goto error;
2888 }
2889 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
2890
2891 if (!stream->metadata_flag) {
2892 index.offset = htobe64(stream->out_fd_offset);
2893 ret = get_index_values(&index, ustream);
2894 if (ret < 0) {
2895 err = ustctl_put_subbuf(ustream);
2896 assert(err == 0);
2897 goto error;
2898 }
2899
2900 /* Update the stream's sequence and discarded events count. */
2901 ret = update_stream_stats(stream);
2902 if (ret < 0) {
2903 PERROR("kernctl_get_events_discarded");
2904 err = ustctl_put_subbuf(ustream);
2905 assert(err == 0);
2906 goto error;
2907 }
2908 } else {
2909 write_index = 0;
2910 }
2911
2912 /* Get the full padded subbuffer size */
2913 err = ustctl_get_padded_subbuf_size(ustream, &len);
2914 assert(err == 0);
2915
2916 /* Get subbuffer data size (without padding) */
2917 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
2918 assert(err == 0);
2919
2920 /* Make sure we don't get a subbuffer size bigger than the padded */
2921 assert(len >= subbuf_size);
2922
2923 padding = len - subbuf_size;
2924
2925 ret = get_current_subbuf_addr(stream, &subbuf_addr);
2926 if (ret) {
2927 write_index = 0;
2928 goto error_put_subbuf;
2929 }
2930
2931 subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
2932
2933 /* write the subbuffer to the tracefile */
2934 ret = lttng_consumer_on_read_subbuffer_mmap(
2935 ctx, stream, &subbuf_view, padding, &index);
2936 /*
2937 * The mmap operation should write subbuf_size amount of data when
2938 * network streaming or the full padding (len) size when we are _not_
2939 * streaming.
2940 */
2941 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
2942 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
2943 /*
2944 * Display the error but continue processing to try to release the
2945 * subbuffer. This is a DBG statement since any unexpected kill or
2946 * signal, the application gets unregistered, relayd gets closed or
2947 * anything that affects the buffer lifetime will trigger this error.
2948 * So, for the sake of the user, don't print this error since it can
2949 * happen and it is OK with the code flow.
2950 */
2951 DBG("Error writing to tracefile "
2952 "(ret: %ld != len: %lu != subbuf_size: %lu)",
2953 ret, len, subbuf_size);
2954 write_index = 0;
2955 }
2956 error_put_subbuf:
2957 err = ustctl_put_next_subbuf(ustream);
2958 assert(err == 0);
2959
2960 /*
2961 * This will consumer the byte on the wait_fd if and only if there is not
2962 * next subbuffer to be acquired.
2963 */
2964 if (!stream->metadata_flag) {
2965 ret = notify_if_more_data(stream, ctx);
2966 if (ret < 0) {
2967 goto error;
2968 }
2969 }
2970
2971 /* Write index if needed. */
2972 if (!write_index) {
2973 goto rotate;
2974 }
2975
2976 if (stream->chan->live_timer_interval && !stream->metadata_flag) {
2977 /*
2978 * In live, block until all the metadata is sent.
2979 */
2980 pthread_mutex_lock(&stream->metadata_timer_lock);
2981 assert(!stream->missed_metadata_flush);
2982 stream->waiting_on_metadata = true;
2983 pthread_mutex_unlock(&stream->metadata_timer_lock);
2984
2985 err = consumer_stream_sync_metadata(ctx, stream->session_id);
2986
2987 pthread_mutex_lock(&stream->metadata_timer_lock);
2988 stream->waiting_on_metadata = false;
2989 if (stream->missed_metadata_flush) {
2990 stream->missed_metadata_flush = false;
2991 pthread_mutex_unlock(&stream->metadata_timer_lock);
2992 (void) consumer_flush_ust_index(stream);
2993 } else {
2994 pthread_mutex_unlock(&stream->metadata_timer_lock);
2995 }
2996
2997 if (err < 0) {
2998 goto error;
2999 }
3000 }
3001
3002 assert(!stream->metadata_flag);
3003 err = consumer_stream_write_index(stream, &index);
3004 if (err < 0) {
3005 goto error;
3006 }
3007
3008 rotate:
3009 /*
3010 * After extracting the packet, we check if the stream is now ready to be
3011 * rotated and perform the action immediately.
3012 */
3013 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3014 if (rotation_ret == 1) {
3015 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
3016 if (rotation_ret < 0) {
3017 ERR("Stream rotation error");
3018 ret = -1;
3019 goto error;
3020 }
3021 } else if (rotation_ret < 0) {
3022 ERR("Checking if stream is ready to rotate");
3023 ret = -1;
3024 goto error;
3025 }
3026 error:
3027 return ret;
3028 }
3029
3030 /*
3031 * Called when a stream is created.
3032 *
3033 * Return 0 on success or else a negative value.
3034 */
3035 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
3036 {
3037 int ret;
3038
3039 assert(stream);
3040
3041 /*
3042 * Don't create anything if this is set for streaming or if there is
3043 * no current trace chunk on the parent channel.
3044 */
3045 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
3046 stream->chan->trace_chunk) {
3047 ret = consumer_stream_create_output_files(stream, true);
3048 if (ret) {
3049 goto error;
3050 }
3051 }
3052 ret = 0;
3053
3054 error:
3055 return ret;
3056 }
3057
3058 /*
3059 * Check if data is still being extracted from the buffers for a specific
3060 * stream. Consumer data lock MUST be acquired before calling this function
3061 * and the stream lock.
3062 *
3063 * Return 1 if the traced data are still getting read else 0 meaning that the
3064 * data is available for trace viewer reading.
3065 */
3066 int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
3067 {
3068 int ret;
3069
3070 assert(stream);
3071 assert(stream->ustream);
3072
3073 DBG("UST consumer checking data pending");
3074
3075 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
3076 ret = 0;
3077 goto end;
3078 }
3079
3080 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
3081 uint64_t contiguous, pushed;
3082
3083 /* Ease our life a bit. */
3084 contiguous = stream->chan->metadata_cache->max_offset;
3085 pushed = stream->ust_metadata_pushed;
3086
3087 /*
3088 * We can simply check whether all contiguously available data
3089 * has been pushed to the ring buffer, since the push operation
3090 * is performed within get_next_subbuf(), and because both
3091 * get_next_subbuf() and put_next_subbuf() are issued atomically
3092 * thanks to the stream lock within
3093 * lttng_ustconsumer_read_subbuffer(). This basically means that
3094 * whetnever ust_metadata_pushed is incremented, the associated
3095 * metadata has been consumed from the metadata stream.
3096 */
3097 DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
3098 contiguous, pushed);
3099 assert(((int64_t) (contiguous - pushed)) >= 0);
3100 if ((contiguous != pushed) ||
3101 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
3102 ret = 1; /* Data is pending */
3103 goto end;
3104 }
3105 } else {
3106 ret = ustctl_get_next_subbuf(stream->ustream);
3107 if (ret == 0) {
3108 /*
3109 * There is still data so let's put back this
3110 * subbuffer.
3111 */
3112 ret = ustctl_put_subbuf(stream->ustream);
3113 assert(ret == 0);
3114 ret = 1; /* Data is pending */
3115 goto end;
3116 }
3117 }
3118
3119 /* Data is NOT pending so ready to be read. */
3120 ret = 0;
3121
3122 end:
3123 return ret;
3124 }
3125
3126 /*
3127 * Stop a given metadata channel timer if enabled and close the wait fd which
3128 * is the poll pipe of the metadata stream.
3129 *
3130 * This MUST be called with the metadata channel lock acquired.
3131 */
3132 void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
3133 {
3134 int ret;
3135
3136 assert(metadata);
3137 assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
3138
3139 DBG("Closing metadata channel key %" PRIu64, metadata->key);
3140
3141 if (metadata->switch_timer_enabled == 1) {
3142 consumer_timer_switch_stop(metadata);
3143 }
3144
3145 if (!metadata->metadata_stream) {
3146 goto end;
3147 }
3148
3149 /*
3150 * Closing write side so the thread monitoring the stream wakes up if any
3151 * and clean the metadata stream.
3152 */
3153 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
3154 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
3155 if (ret < 0) {
3156 PERROR("closing metadata pipe write side");
3157 }
3158 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
3159 }
3160
3161 end:
3162 return;
3163 }
3164
3165 /*
3166 * Close every metadata stream wait fd of the metadata hash table. This
3167 * function MUST be used very carefully so not to run into a race between the
3168 * metadata thread handling streams and this function closing their wait fd.
3169 *
3170 * For UST, this is used when the session daemon hangs up. Its the metadata
3171 * producer so calling this is safe because we are assured that no state change
3172 * can occur in the metadata thread for the streams in the hash table.
3173 */
3174 void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
3175 {
3176 struct lttng_ht_iter iter;
3177 struct lttng_consumer_stream *stream;
3178
3179 assert(metadata_ht);
3180 assert(metadata_ht->ht);
3181
3182 DBG("UST consumer closing all metadata streams");
3183
3184 rcu_read_lock();
3185 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
3186 node.node) {
3187
3188 health_code_update();
3189
3190 pthread_mutex_lock(&stream->chan->lock);
3191 lttng_ustconsumer_close_metadata(stream->chan);
3192 pthread_mutex_unlock(&stream->chan->lock);
3193
3194 }
3195 rcu_read_unlock();
3196 }
3197
3198 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
3199 {
3200 int ret;
3201
3202 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
3203 if (ret < 0) {
3204 ERR("Unable to close wakeup fd");
3205 }
3206 }
3207
3208 /*
3209 * Please refer to consumer-timer.c before adding any lock within this
3210 * function or any of its callees. Timers have a very strict locking
3211 * semantic with respect to teardown. Failure to respect this semantic
3212 * introduces deadlocks.
3213 *
3214 * DON'T hold the metadata lock when calling this function, else this
3215 * can cause deadlock involving consumer awaiting for metadata to be
3216 * pushed out due to concurrent interaction with the session daemon.
3217 */
3218 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
3219 struct lttng_consumer_channel *channel, int timer, int wait)
3220 {
3221 struct lttcomm_metadata_request_msg request;
3222 struct lttcomm_consumer_msg msg;
3223 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3224 uint64_t len, key, offset, version;
3225 int ret;
3226
3227 assert(channel);
3228 assert(channel->metadata_cache);
3229
3230 memset(&request, 0, sizeof(request));
3231
3232 /* send the metadata request to sessiond */
3233 switch (consumer_data.type) {
3234 case LTTNG_CONSUMER64_UST:
3235 request.bits_per_long = 64;
3236 break;
3237 case LTTNG_CONSUMER32_UST:
3238 request.bits_per_long = 32;
3239 break;
3240 default:
3241 request.bits_per_long = 0;
3242 break;
3243 }
3244
3245 request.session_id = channel->session_id;
3246 request.session_id_per_pid = channel->session_id_per_pid;
3247 /*
3248 * Request the application UID here so the metadata of that application can
3249 * be sent back. The channel UID corresponds to the user UID of the session
3250 * used for the rights on the stream file(s).
3251 */
3252 request.uid = channel->ust_app_uid;
3253 request.key = channel->key;
3254
3255 DBG("Sending metadata request to sessiond, session id %" PRIu64
3256 ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
3257 request.session_id, request.session_id_per_pid, request.uid,
3258 request.key);
3259
3260 pthread_mutex_lock(&ctx->metadata_socket_lock);
3261
3262 health_code_update();
3263
3264 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
3265 sizeof(request));
3266 if (ret < 0) {
3267 ERR("Asking metadata to sessiond");
3268 goto end;
3269 }
3270
3271 health_code_update();
3272
3273 /* Receive the metadata from sessiond */
3274 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
3275 sizeof(msg));
3276 if (ret != sizeof(msg)) {
3277 DBG("Consumer received unexpected message size %d (expects %zu)",
3278 ret, sizeof(msg));
3279 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3280 /*
3281 * The ret value might 0 meaning an orderly shutdown but this is ok
3282 * since the caller handles this.
3283 */
3284 goto end;
3285 }
3286
3287 health_code_update();
3288
3289 if (msg.cmd_type == LTTNG_ERR_UND) {
3290 /* No registry found */
3291 (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
3292 ret_code);
3293 ret = 0;
3294 goto end;
3295 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
3296 ERR("Unexpected cmd_type received %d", msg.cmd_type);
3297 ret = -1;
3298 goto end;
3299 }
3300
3301 len = msg.u.push_metadata.len;
3302 key = msg.u.push_metadata.key;
3303 offset = msg.u.push_metadata.target_offset;
3304 version = msg.u.push_metadata.version;
3305
3306 assert(key == channel->key);
3307 if (len == 0) {
3308 DBG("No new metadata to receive for key %" PRIu64, key);
3309 }
3310
3311 health_code_update();
3312
3313 /* Tell session daemon we are ready to receive the metadata. */
3314 ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
3315 LTTCOMM_CONSUMERD_SUCCESS);
3316 if (ret < 0 || len == 0) {
3317 /*
3318 * Somehow, the session daemon is not responding anymore or there is
3319 * nothing to receive.
3320 */
3321 goto end;
3322 }
3323
3324 health_code_update();
3325
3326 ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
3327 key, offset, len, version, channel, timer, wait);
3328 if (ret >= 0) {
3329 /*
3330 * Only send the status msg if the sessiond is alive meaning a positive
3331 * ret code.
3332 */
3333 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
3334 }
3335 ret = 0;
3336
3337 end:
3338 health_code_update();
3339
3340 pthread_mutex_unlock(&ctx->metadata_socket_lock);
3341 return ret;
3342 }
3343
3344 /*
3345 * Return the ustctl call for the get stream id.
3346 */
3347 int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
3348 uint64_t *stream_id)
3349 {
3350 assert(stream);
3351 assert(stream_id);
3352
3353 return ustctl_get_stream_id(stream->ustream, stream_id);
3354 }
This page took 0.152021 seconds and 4 git commands to generate.