CUSTOM: consumer: ust: relayd: perform 2 stage relay_add_stream
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
6c1c0768 19#define _LGPL_SOURCE
3bd1e081 20#include <assert.h>
f02e1e8a 21#include <lttng/ust-ctl.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
dbb5dfe6 28#include <sys/stat.h>
3bd1e081 29#include <sys/types.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31#include <unistd.h>
ffe60014 32#include <urcu/list.h>
331744e3 33#include <signal.h>
29d1a7ae
JG
34#include <stdbool.h>
35#include <stdint.h>
0857097f 36
51a9e1c7 37#include <bin/lttng-consumerd/health-consumerd.h>
990570ed 38#include <common/common.h>
10a8a223 39#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 40#include <common/relayd/relayd.h>
dbb5dfe6 41#include <common/compat/fcntl.h>
f263b7fd 42#include <common/compat/endian.h>
c8fea79c
JR
43#include <common/consumer/consumer-metadata-cache.h>
44#include <common/consumer/consumer-stream.h>
45#include <common/consumer/consumer-timer.h>
fe4477ee 46#include <common/utils.h>
309167d2 47#include <common/index/index.h>
29d1a7ae 48#include <common/consumer/consumer.h>
d6ef77b3 49#include <common/optional.h>
10a8a223
DG
50
51#include "ust-consumer.h"
3bd1e081 52
45863397 53#define INT_MAX_STR_LEN 12 /* includes \0 */
4628484a 54
3bd1e081
MD
55extern struct lttng_consumer_global_data consumer_data;
56extern int consumer_poll_timeout;
57extern volatile int consumer_quit;
58
59/*
ffe60014
DG
60 * Free channel object and all streams associated with it. This MUST be used
61 * only and only if the channel has _NEVER_ been added to the global channel
62 * hash table.
3bd1e081 63 */
ffe60014 64static void destroy_channel(struct lttng_consumer_channel *channel)
3bd1e081 65{
ffe60014
DG
66 struct lttng_consumer_stream *stream, *stmp;
67
68 assert(channel);
69
70 DBG("UST consumer cleaning stream list");
71
72 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
73 send_node) {
9ce5646a
MD
74
75 health_code_update();
76
ffe60014
DG
77 cds_list_del(&stream->send_node);
78 ustctl_destroy_stream(stream->ustream);
79 free(stream);
80 }
81
82 /*
83 * If a channel is available meaning that was created before the streams
84 * were, delete it.
85 */
86 if (channel->uchan) {
87 lttng_ustconsumer_del_channel(channel);
b83e03c4 88 lttng_ustconsumer_free_channel(channel);
ffe60014
DG
89 }
90 free(channel);
91}
3bd1e081
MD
92
93/*
ffe60014 94 * Add channel to internal consumer state.
3bd1e081 95 *
ffe60014 96 * Returns 0 on success or else a negative value.
3bd1e081 97 */
ffe60014
DG
98static int add_channel(struct lttng_consumer_channel *channel,
99 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
100{
101 int ret = 0;
102
ffe60014
DG
103 assert(channel);
104 assert(ctx);
105
106 if (ctx->on_recv_channel != NULL) {
107 ret = ctx->on_recv_channel(channel);
108 if (ret == 0) {
d8ef542d 109 ret = consumer_add_channel(channel, ctx);
ffe60014
DG
110 } else if (ret < 0) {
111 /* Most likely an ENOMEM. */
112 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
113 goto error;
114 }
115 } else {
d8ef542d 116 ret = consumer_add_channel(channel, ctx);
3bd1e081
MD
117 }
118
d88aee68 119 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
ffe60014
DG
120
121error:
3bd1e081
MD
122 return ret;
123}
124
ffe60014
DG
125/*
126 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
127 * error value if applicable is set in it else it is kept untouched.
3bd1e081 128 *
ffe60014 129 * Return NULL on error else the newly allocated stream object.
3bd1e081 130 */
ffe60014
DG
131static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
132 struct lttng_consumer_channel *channel,
133 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
134{
135 int alloc_ret;
136 struct lttng_consumer_stream *stream = NULL;
137
138 assert(channel);
139 assert(ctx);
140
29d1a7ae 141 stream = consumer_stream_create(
59db0d42
JG
142 channel,
143 channel->key,
ffe60014
DG
144 key,
145 LTTNG_CONSUMER_ACTIVE_STREAM,
146 channel->name,
147 channel->uid,
148 channel->gid,
149 channel->relayd_id,
150 channel->session_id,
151 cpu,
152 &alloc_ret,
4891ece8
DG
153 channel->type,
154 channel->monitor);
ffe60014
DG
155 if (stream == NULL) {
156 switch (alloc_ret) {
157 case -ENOENT:
158 /*
159 * We could not find the channel. Can happen if cpu hotplug
160 * happens while tearing down.
161 */
162 DBG3("Could not find channel");
163 break;
164 case -ENOMEM:
165 case -EINVAL:
166 default:
167 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
168 break;
169 }
170 goto error;
171 }
172
ffe60014
DG
173error:
174 if (_alloc_ret) {
175 *_alloc_ret = alloc_ret;
176 }
177 return stream;
178}
179
180/*
181 * Send the given stream pointer to the corresponding thread.
182 *
183 * Returns 0 on success else a negative value.
184 */
185static int send_stream_to_thread(struct lttng_consumer_stream *stream,
186 struct lttng_consumer_local_data *ctx)
187{
dae10966
DG
188 int ret;
189 struct lttng_pipe *stream_pipe;
ffe60014
DG
190
191 /* Get the right pipe where the stream will be sent. */
192 if (stream->metadata_flag) {
5ab66908
MD
193 ret = consumer_add_metadata_stream(stream);
194 if (ret) {
195 ERR("Consumer add metadata stream %" PRIu64 " failed.",
196 stream->key);
197 goto error;
198 }
dae10966 199 stream_pipe = ctx->consumer_metadata_pipe;
ffe60014 200 } else {
5ab66908
MD
201 ret = consumer_add_data_stream(stream);
202 if (ret) {
203 ERR("Consumer add stream %" PRIu64 " failed.",
204 stream->key);
205 goto error;
206 }
dae10966 207 stream_pipe = ctx->consumer_data_pipe;
ffe60014
DG
208 }
209
5ab66908
MD
210 /*
211 * From this point on, the stream's ownership has been moved away from
212 * the channel and becomes globally visible.
213 */
214 stream->globally_visible = 1;
215
dae10966 216 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
ffe60014 217 if (ret < 0) {
dae10966
DG
218 ERR("Consumer write %s stream to pipe %d",
219 stream->metadata_flag ? "metadata" : "data",
220 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
221 if (stream->metadata_flag) {
222 consumer_del_stream_for_metadata(stream);
223 } else {
224 consumer_del_stream_for_data(stream);
225 }
ffe60014 226 }
5ab66908 227error:
ffe60014
DG
228 return ret;
229}
230
4628484a
MD
231static
232int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
233{
45863397 234 char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
4628484a
MD
235 int ret;
236
237 strncpy(stream_shm_path, shm_path, PATH_MAX);
238 stream_shm_path[PATH_MAX - 1] = '\0';
45863397 239 ret = snprintf(cpu_nr, INT_MAX_STR_LEN, "%i", cpu);
67f8cb8d
MD
240 if (ret < 0) {
241 PERROR("snprintf");
4628484a
MD
242 goto end;
243 }
244 strncat(stream_shm_path, cpu_nr,
245 PATH_MAX - strlen(stream_shm_path) - 1);
246 ret = 0;
247end:
248 return ret;
249}
250
d88aee68
DG
251/*
252 * Create streams for the given channel using liblttng-ust-ctl.
253 *
254 * Return 0 on success else a negative value.
255 */
ffe60014
DG
256static int create_ust_streams(struct lttng_consumer_channel *channel,
257 struct lttng_consumer_local_data *ctx)
258{
259 int ret, cpu = 0;
260 struct ustctl_consumer_stream *ustream;
261 struct lttng_consumer_stream *stream;
262
263 assert(channel);
264 assert(ctx);
265
266 /*
267 * While a stream is available from ustctl. When NULL is returned, we've
268 * reached the end of the possible stream for the channel.
269 */
270 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
271 int wait_fd;
04ef1097 272 int ust_metadata_pipe[2];
ffe60014 273
9ce5646a
MD
274 health_code_update();
275
04ef1097
MD
276 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
277 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
278 if (ret < 0) {
279 ERR("Create ust metadata poll pipe");
280 goto error;
281 }
282 wait_fd = ust_metadata_pipe[0];
283 } else {
284 wait_fd = ustctl_stream_get_wait_fd(ustream);
285 }
ffe60014
DG
286
287 /* Allocate consumer stream object. */
288 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
289 if (!stream) {
290 goto error_alloc;
291 }
292 stream->ustream = ustream;
293 /*
294 * Store it so we can save multiple function calls afterwards since
295 * this value is used heavily in the stream threads. This is UST
296 * specific so this is why it's done after allocation.
297 */
298 stream->wait_fd = wait_fd;
299
b31398bb
DG
300 /*
301 * Increment channel refcount since the channel reference has now been
302 * assigned in the allocation process above.
303 */
10a50311
JD
304 if (stream->chan->monitor) {
305 uatomic_inc(&stream->chan->refcount);
306 }
b31398bb 307
ffe60014
DG
308 /*
309 * Order is important this is why a list is used. On error, the caller
310 * should clean this list.
311 */
312 cds_list_add_tail(&stream->send_node, &channel->streams.head);
313
314 ret = ustctl_get_max_subbuf_size(stream->ustream,
315 &stream->max_sb_size);
316 if (ret < 0) {
317 ERR("ustctl_get_max_subbuf_size failed for stream %s",
318 stream->name);
319 goto error;
320 }
321
322 /* Do actions once stream has been received. */
323 if (ctx->on_recv_stream) {
324 ret = ctx->on_recv_stream(stream);
325 if (ret < 0) {
326 goto error;
327 }
328 }
329
d88aee68 330 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
ffe60014
DG
331 stream->name, stream->key, stream->relayd_stream_id);
332
333 /* Set next CPU stream. */
334 channel->streams.count = ++cpu;
d88aee68
DG
335
336 /* Keep stream reference when creating metadata. */
337 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
338 channel->metadata_stream = stream;
8de4f941
JG
339 if (channel->monitor) {
340 /* Set metadata poll pipe if we created one */
341 memcpy(stream->ust_metadata_poll_pipe,
342 ust_metadata_pipe,
343 sizeof(ust_metadata_pipe));
344 }
d88aee68 345 }
ffe60014
DG
346 }
347
348 return 0;
349
350error:
351error_alloc:
352 return ret;
353}
354
4628484a
MD
355/*
356 * create_posix_shm is never called concurrently within a process.
357 */
358static
359int create_posix_shm(void)
360{
361 char tmp_name[NAME_MAX];
362 int shmfd, ret;
363
364 ret = snprintf(tmp_name, NAME_MAX, "/ust-shm-consumer-%d", getpid());
365 if (ret < 0) {
366 PERROR("snprintf");
367 return -1;
368 }
369 /*
370 * Allocate shm, and immediately unlink its shm oject, keeping
371 * only the file descriptor as a reference to the object.
372 * We specifically do _not_ use the / at the beginning of the
373 * pathname so that some OS implementations can keep it local to
374 * the process (POSIX leaves this implementation-defined).
375 */
376 shmfd = shm_open(tmp_name, O_CREAT | O_EXCL | O_RDWR, 0700);
377 if (shmfd < 0) {
378 PERROR("shm_open");
379 goto error_shm_open;
380 }
381 ret = shm_unlink(tmp_name);
382 if (ret < 0 && errno != ENOENT) {
383 PERROR("shm_unlink");
384 goto error_shm_release;
385 }
386 return shmfd;
387
388error_shm_release:
389 ret = close(shmfd);
390 if (ret) {
391 PERROR("close");
392 }
393error_shm_open:
394 return -1;
395}
396
397static int open_ust_stream_fd(struct lttng_consumer_channel *channel,
398 struct ustctl_consumer_channel_attr *attr,
399 int cpu)
400{
401 char shm_path[PATH_MAX];
402 int ret;
403
404 if (!channel->shm_path[0]) {
405 return create_posix_shm();
406 }
407 ret = get_stream_shm_path(shm_path, channel->shm_path, cpu);
408 if (ret) {
409 goto error_shm_path;
410 }
411 return run_as_open(shm_path,
412 O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
413 channel->uid, channel->gid);
414
415error_shm_path:
416 return -1;
417}
418
ffe60014
DG
419/*
420 * Create an UST channel with the given attributes and send it to the session
421 * daemon using the ust ctl API.
422 *
423 * Return 0 on success or else a negative value.
424 */
4628484a
MD
425static int create_ust_channel(struct lttng_consumer_channel *channel,
426 struct ustctl_consumer_channel_attr *attr,
427 struct ustctl_consumer_channel **ust_chanp)
ffe60014 428{
4628484a
MD
429 int ret, nr_stream_fds, i, j;
430 int *stream_fds;
431 struct ustctl_consumer_channel *ust_channel;
ffe60014 432
4628484a 433 assert(channel);
ffe60014 434 assert(attr);
4628484a 435 assert(ust_chanp);
ffe60014
DG
436
437 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
438 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
439 "switch_timer_interval: %u, read_timer_interval: %u, "
440 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
441 attr->num_subbuf, attr->switch_timer_interval,
442 attr->read_timer_interval, attr->output, attr->type);
443
4628484a
MD
444 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
445 nr_stream_fds = 1;
446 else
447 nr_stream_fds = ustctl_get_nr_stream_per_channel();
448 stream_fds = zmalloc(nr_stream_fds * sizeof(*stream_fds));
449 if (!stream_fds) {
450 ret = -1;
451 goto error_alloc;
452 }
453 for (i = 0; i < nr_stream_fds; i++) {
454 stream_fds[i] = open_ust_stream_fd(channel, attr, i);
455 if (stream_fds[i] < 0) {
456 ret = -1;
457 goto error_open;
458 }
459 }
460 ust_channel = ustctl_create_channel(attr, stream_fds, nr_stream_fds);
461 if (!ust_channel) {
ffe60014
DG
462 ret = -1;
463 goto error_create;
464 }
4628484a
MD
465 channel->nr_stream_fds = nr_stream_fds;
466 channel->stream_fds = stream_fds;
467 *ust_chanp = ust_channel;
ffe60014
DG
468
469 return 0;
470
471error_create:
4628484a
MD
472error_open:
473 for (j = i - 1; j >= 0; j--) {
474 int closeret;
475
476 closeret = close(stream_fds[j]);
477 if (closeret) {
478 PERROR("close");
479 }
480 if (channel->shm_path[0]) {
481 char shm_path[PATH_MAX];
482
483 closeret = get_stream_shm_path(shm_path,
484 channel->shm_path, j);
485 if (closeret) {
486 ERR("Cannot get stream shm path");
487 }
488 closeret = run_as_unlink(shm_path,
489 channel->uid, channel->gid);
490 if (closeret) {
4628484a
MD
491 PERROR("unlink %s", shm_path);
492 }
493 }
494 }
495 /* Try to rmdir all directories under shm_path root. */
496 if (channel->root_shm_path[0]) {
497 (void) run_as_recursive_rmdir(channel->root_shm_path,
498 channel->uid, channel->gid);
499 }
500 free(stream_fds);
501error_alloc:
ffe60014
DG
502 return ret;
503}
504
d88aee68
DG
505/*
506 * Send a single given stream to the session daemon using the sock.
507 *
508 * Return 0 on success else a negative value.
509 */
ffe60014
DG
510static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
511{
512 int ret;
513
514 assert(stream);
515 assert(sock >= 0);
516
3eb914c0 517 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
ffe60014
DG
518
519 /* Send stream to session daemon. */
520 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
521 if (ret < 0) {
522 goto error;
523 }
524
ffe60014
DG
525error:
526 return ret;
527}
528
529/*
530 * Send channel to sessiond.
531 *
d88aee68 532 * Return 0 on success or else a negative value.
ffe60014
DG
533 */
534static int send_sessiond_channel(int sock,
535 struct lttng_consumer_channel *channel,
536 struct lttng_consumer_local_data *ctx, int *relayd_error)
537{
0c759fc9 538 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
539 struct lttng_consumer_stream *stream;
540
541 assert(channel);
542 assert(ctx);
543 assert(sock >= 0);
544
545 DBG("UST consumer sending channel %s to sessiond", channel->name);
546
62285ea4 547 if (channel->relayd_id != (uint64_t) -1ULL) {
0934dd7e
JR
548 ret = consumer_send_relayd_channel_bulk(channel);
549 if (ret < 0) {
550 /*
551 * Flag that the relayd was the problem here probably due to a
552 * communicaton error on the socket.
553 */
554 if (relayd_error) {
555 *relayd_error = 1;
a4baae1b 556 }
0934dd7e 557 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
a4baae1b 558 }
f2a444f1 559 }
ffe60014 560
f2a444f1
DG
561 /* Inform sessiond that we are about to send channel and streams. */
562 ret = consumer_send_status_msg(sock, ret_code);
0c759fc9 563 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
f2a444f1
DG
564 /*
565 * Either the session daemon is not responding or the relayd died so we
566 * stop now.
567 */
568 goto error;
569 }
570
571 /* Send channel to sessiond. */
572 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
573 if (ret < 0) {
574 goto error;
575 }
576
577 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
578 if (ret < 0) {
579 goto error;
580 }
581
582 /* The channel was sent successfully to the sessiond at this point. */
583 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
9ce5646a
MD
584
585 health_code_update();
586
ffe60014
DG
587 /* Send stream to session daemon. */
588 ret = send_sessiond_stream(sock, stream);
589 if (ret < 0) {
590 goto error;
591 }
592 }
593
594 /* Tell sessiond there is no more stream. */
595 ret = ustctl_send_stream_to_sessiond(sock, NULL);
596 if (ret < 0) {
597 goto error;
598 }
599
600 DBG("UST consumer NULL stream sent to sessiond");
601
602 return 0;
603
604error:
0c759fc9 605 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
f2a444f1
DG
606 ret = -1;
607 }
ffe60014
DG
608 return ret;
609}
610
611/*
612 * Creates a channel and streams and add the channel it to the channel internal
613 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
614 * received.
615 *
616 * Return 0 on success or else, a negative value is returned and the channel
617 * MUST be destroyed by consumer_del_channel().
618 */
619static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
620 struct lttng_consumer_channel *channel,
621 struct ustctl_consumer_channel_attr *attr)
3bd1e081
MD
622{
623 int ret;
624
ffe60014
DG
625 assert(ctx);
626 assert(channel);
627 assert(attr);
628
629 /*
630 * This value is still used by the kernel consumer since for the kernel,
631 * the stream ownership is not IN the consumer so we need to have the
632 * number of left stream that needs to be initialized so we can know when
633 * to delete the channel (see consumer.c).
634 *
635 * As for the user space tracer now, the consumer creates and sends the
636 * stream to the session daemon which only sends them to the application
637 * once every stream of a channel is received making this value useless
638 * because we they will be added to the poll thread before the application
639 * receives them. This ensures that a stream can not hang up during
640 * initilization of a channel.
641 */
642 channel->nb_init_stream_left = 0;
643
644 /* The reply msg status is handled in the following call. */
4628484a 645 ret = create_ust_channel(channel, attr, &channel->uchan);
ffe60014 646 if (ret < 0) {
10a50311 647 goto end;
3bd1e081
MD
648 }
649
d8ef542d
MD
650 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
651
10a50311
JD
652 /*
653 * For the snapshots (no monitor), we create the metadata streams
654 * on demand, not during the channel creation.
655 */
656 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
657 ret = 0;
658 goto end;
659 }
660
ffe60014
DG
661 /* Open all streams for this channel. */
662 ret = create_ust_streams(channel, ctx);
663 if (ret < 0) {
10a50311 664 goto end;
ffe60014
DG
665 }
666
10a50311 667end:
3bd1e081
MD
668 return ret;
669}
670
d88aee68
DG
671/*
672 * Send all stream of a channel to the right thread handling it.
673 *
674 * On error, return a negative value else 0 on success.
675 */
676static int send_streams_to_thread(struct lttng_consumer_channel *channel,
677 struct lttng_consumer_local_data *ctx)
678{
679 int ret = 0;
680 struct lttng_consumer_stream *stream, *stmp;
681
682 assert(channel);
683 assert(ctx);
684
685 /* Send streams to the corresponding thread. */
686 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
687 send_node) {
9ce5646a
MD
688
689 health_code_update();
690
d88aee68
DG
691 /* Sending the stream to the thread. */
692 ret = send_stream_to_thread(stream, ctx);
693 if (ret < 0) {
694 /*
695 * If we are unable to send the stream to the thread, there is
696 * a big problem so just stop everything.
697 */
5ab66908
MD
698 /* Remove node from the channel stream list. */
699 cds_list_del(&stream->send_node);
d88aee68
DG
700 goto error;
701 }
702
703 /* Remove node from the channel stream list. */
704 cds_list_del(&stream->send_node);
4891ece8 705
d88aee68
DG
706 }
707
708error:
709 return ret;
710}
711
7972aab2
DG
712/*
713 * Flush channel's streams using the given key to retrieve the channel.
714 *
715 * Return 0 on success else an LTTng error code.
716 */
717static int flush_channel(uint64_t chan_key)
718{
719 int ret = 0;
720 struct lttng_consumer_channel *channel;
721 struct lttng_consumer_stream *stream;
722 struct lttng_ht *ht;
723 struct lttng_ht_iter iter;
724
8fd623e0 725 DBG("UST consumer flush channel key %" PRIu64, chan_key);
7972aab2 726
a500c257 727 rcu_read_lock();
7972aab2
DG
728 channel = consumer_find_channel(chan_key);
729 if (!channel) {
8fd623e0 730 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
7972aab2
DG
731 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
732 goto error;
733 }
734
735 ht = consumer_data.stream_per_chan_id_ht;
736
737 /* For each stream of the channel id, flush it. */
7972aab2
DG
738 cds_lfht_for_each_entry_duplicate(ht->ht,
739 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
740 &channel->key, &iter.iter, stream, node_channel_id.node) {
9ce5646a
MD
741
742 health_code_update();
743
0dd01979 744 pthread_mutex_lock(&stream->lock);
123fff97
JR
745
746 /*
747 * Protect against concurrent teardown of a stream.
748 */
749 if (cds_lfht_is_node_deleted(&stream->node.node)) {
750 goto next;
751 }
752
0dd01979
MD
753 if (!stream->quiescent) {
754 ustctl_flush_buffer(stream->ustream, 0);
755 stream->quiescent = true;
756 }
123fff97 757next:
0dd01979
MD
758 pthread_mutex_unlock(&stream->lock);
759 }
760error:
761 rcu_read_unlock();
762 return ret;
763}
764
765/*
766 * Clear quiescent state from channel's streams using the given key to
767 * retrieve the channel.
768 *
769 * Return 0 on success else an LTTng error code.
770 */
771static int clear_quiescent_channel(uint64_t chan_key)
772{
773 int ret = 0;
774 struct lttng_consumer_channel *channel;
775 struct lttng_consumer_stream *stream;
776 struct lttng_ht *ht;
777 struct lttng_ht_iter iter;
778
779 DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
780
781 rcu_read_lock();
782 channel = consumer_find_channel(chan_key);
783 if (!channel) {
784 ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
785 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
786 goto error;
787 }
788
789 ht = consumer_data.stream_per_chan_id_ht;
790
791 /* For each stream of the channel id, clear quiescent state. */
792 cds_lfht_for_each_entry_duplicate(ht->ht,
793 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
794 &channel->key, &iter.iter, stream, node_channel_id.node) {
795
796 health_code_update();
797
798 pthread_mutex_lock(&stream->lock);
799 stream->quiescent = false;
800 pthread_mutex_unlock(&stream->lock);
7972aab2 801 }
7972aab2 802error:
a500c257 803 rcu_read_unlock();
7972aab2
DG
804 return ret;
805}
806
d88aee68
DG
807/*
808 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
a500c257 809 * RCU read side lock MUST be acquired before calling this function.
d88aee68
DG
810 *
811 * Return 0 on success else an LTTng error code.
812 */
813static int close_metadata(uint64_t chan_key)
814{
ea88ca2a 815 int ret = 0;
d88aee68 816 struct lttng_consumer_channel *channel;
a1ca62da 817 unsigned int channel_monitor;
d88aee68 818
8fd623e0 819 DBG("UST consumer close metadata key %" PRIu64, chan_key);
d88aee68
DG
820
821 channel = consumer_find_channel(chan_key);
822 if (!channel) {
84cc9aa0
DG
823 /*
824 * This is possible if the metadata thread has issue a delete because
825 * the endpoint point of the stream hung up. There is no way the
826 * session daemon can know about it thus use a DBG instead of an actual
827 * error.
828 */
829 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
d88aee68
DG
830 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
831 goto error;
832 }
833
ea88ca2a 834 pthread_mutex_lock(&consumer_data.lock);
a9838785 835 pthread_mutex_lock(&channel->lock);
a1ca62da 836 channel_monitor = channel->monitor;
73811ecc
DG
837 if (cds_lfht_is_node_deleted(&channel->node.node)) {
838 goto error_unlock;
839 }
840
6d574024 841 lttng_ustconsumer_close_metadata(channel);
a1ca62da
JG
842 pthread_mutex_unlock(&channel->lock);
843 pthread_mutex_unlock(&consumer_data.lock);
d88aee68 844
a1ca62da
JG
845 /*
846 * The ownership of a metadata channel depends on the type of
847 * session to which it belongs. In effect, the monitor flag is checked
848 * to determine if this metadata channel is in "snapshot" mode or not.
849 *
850 * In the non-snapshot case, the metadata channel is created along with
851 * a single stream which will remain present until the metadata channel
852 * is destroyed (on the destruction of its session). In this case, the
853 * metadata stream in "monitored" by the metadata poll thread and holds
854 * the ownership of its channel.
855 *
856 * Closing the metadata will cause the metadata stream's "metadata poll
857 * pipe" to be closed. Closing this pipe will wake-up the metadata poll
858 * thread which will teardown the metadata stream which, in return,
859 * deletes the metadata channel.
860 *
861 * In the snapshot case, the metadata stream is created and destroyed
862 * on every snapshot record. Since the channel doesn't have an owner
863 * other than the session daemon, it is safe to destroy it immediately
864 * on reception of the CLOSE_METADATA command.
865 */
866 if (!channel_monitor) {
867 /*
868 * The channel and consumer_data locks must be
869 * released before this call since consumer_del_channel
870 * re-acquires the channel and consumer_data locks to teardown
871 * the channel and queue its reclamation by the "call_rcu"
872 * worker thread.
873 */
874 consumer_del_channel(channel);
875 }
876
877 return ret;
ea88ca2a 878error_unlock:
a9838785 879 pthread_mutex_unlock(&channel->lock);
ea88ca2a 880 pthread_mutex_unlock(&consumer_data.lock);
d88aee68
DG
881error:
882 return ret;
883}
884
885/*
886 * RCU read side lock MUST be acquired before calling this function.
887 *
888 * Return 0 on success else an LTTng error code.
889 */
890static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
891{
892 int ret;
893 struct lttng_consumer_channel *metadata;
894
8fd623e0 895 DBG("UST consumer setup metadata key %" PRIu64, key);
d88aee68
DG
896
897 metadata = consumer_find_channel(key);
898 if (!metadata) {
899 ERR("UST consumer push metadata %" PRIu64 " not found", key);
900 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
10a50311
JD
901 goto end;
902 }
903
904 /*
905 * In no monitor mode, the metadata channel has no stream(s) so skip the
906 * ownership transfer to the metadata thread.
907 */
908 if (!metadata->monitor) {
909 DBG("Metadata channel in no monitor");
910 ret = 0;
911 goto end;
d88aee68
DG
912 }
913
914 /*
915 * Send metadata stream to relayd if one available. Availability is
916 * known if the stream is still in the list of the channel.
917 */
918 if (cds_list_empty(&metadata->streams.head)) {
919 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
920 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
f5a0c9cf 921 goto error_no_stream;
d88aee68
DG
922 }
923
924 /* Send metadata stream to relayd if needed. */
6d40f8fa 925 if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) {
62285ea4
DG
926 ret = consumer_send_relayd_stream(metadata->metadata_stream,
927 metadata->pathname);
928 if (ret < 0) {
929 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
930 goto error;
931 }
601262d6 932 ret = consumer_send_relayd_streams_sent(
6d40f8fa 933 metadata->metadata_stream->relayd_id);
601262d6
JD
934 if (ret < 0) {
935 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
936 goto error;
937 }
d88aee68
DG
938 }
939
940 ret = send_streams_to_thread(metadata, ctx);
941 if (ret < 0) {
942 /*
943 * If we are unable to send the stream to the thread, there is
944 * a big problem so just stop everything.
945 */
946 ret = LTTCOMM_CONSUMERD_FATAL;
947 goto error;
948 }
949 /* List MUST be empty after or else it could be reused. */
950 assert(cds_list_empty(&metadata->streams.head));
951
10a50311
JD
952 ret = 0;
953 goto end;
d88aee68
DG
954
955error:
f2a444f1
DG
956 /*
957 * Delete metadata channel on error. At this point, the metadata stream can
958 * NOT be monitored by the metadata thread thus having the guarantee that
959 * the stream is still in the local stream list of the channel. This call
960 * will make sure to clean that list.
961 */
f5a0c9cf 962 consumer_stream_destroy(metadata->metadata_stream, NULL);
212d67a2
DG
963 cds_list_del(&metadata->metadata_stream->send_node);
964 metadata->metadata_stream = NULL;
f5a0c9cf 965error_no_stream:
10a50311
JD
966end:
967 return ret;
968}
969
970/*
971 * Snapshot the whole metadata.
972 *
973 * Returns 0 on success, < 0 on error
974 */
975static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
976 struct lttng_consumer_local_data *ctx)
977{
978 int ret = 0;
10a50311
JD
979 struct lttng_consumer_channel *metadata_channel;
980 struct lttng_consumer_stream *metadata_stream;
981
982 assert(path);
983 assert(ctx);
984
985 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
986 key, path);
987
988 rcu_read_lock();
989
990 metadata_channel = consumer_find_channel(key);
991 if (!metadata_channel) {
6a00837f
MD
992 ERR("UST snapshot metadata channel not found for key %" PRIu64,
993 key);
10a50311
JD
994 ret = -1;
995 goto error;
996 }
997 assert(!metadata_channel->monitor);
998
9ce5646a
MD
999 health_code_update();
1000
10a50311
JD
1001 /*
1002 * Ask the sessiond if we have new metadata waiting and update the
1003 * consumer metadata cache.
1004 */
94d49140 1005 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
10a50311
JD
1006 if (ret < 0) {
1007 goto error;
1008 }
1009
9ce5646a
MD
1010 health_code_update();
1011
10a50311
JD
1012 /*
1013 * The metadata stream is NOT created in no monitor mode when the channel
1014 * is created on a sessiond ask channel command.
1015 */
1016 ret = create_ust_streams(metadata_channel, ctx);
1017 if (ret < 0) {
1018 goto error;
1019 }
1020
1021 metadata_stream = metadata_channel->metadata_stream;
1022 assert(metadata_stream);
1023
1024 if (relayd_id != (uint64_t) -1ULL) {
6d40f8fa 1025 metadata_stream->relayd_id = relayd_id;
10a50311
JD
1026 ret = consumer_send_relayd_stream(metadata_stream, path);
1027 if (ret < 0) {
1028 goto error_stream;
1029 }
1030 } else {
1031 ret = utils_create_stream_file(path, metadata_stream->name,
1032 metadata_stream->chan->tracefile_size,
1033 metadata_stream->tracefile_count_current,
309167d2 1034 metadata_stream->uid, metadata_stream->gid, NULL);
10a50311
JD
1035 if (ret < 0) {
1036 goto error_stream;
1037 }
1038 metadata_stream->out_fd = ret;
1039 metadata_stream->tracefile_size_current = 0;
1040 }
1041
04ef1097 1042 do {
9ce5646a
MD
1043 health_code_update();
1044
29d1a7ae 1045 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
10a50311 1046 if (ret < 0) {
94d49140 1047 goto error_stream;
10a50311 1048 }
04ef1097 1049 } while (ret > 0);
10a50311 1050
10a50311
JD
1051error_stream:
1052 /*
1053 * Clean up the stream completly because the next snapshot will use a new
1054 * metadata stream.
1055 */
10a50311 1056 consumer_stream_destroy(metadata_stream, NULL);
212d67a2 1057 cds_list_del(&metadata_stream->send_node);
10a50311
JD
1058 metadata_channel->metadata_stream = NULL;
1059
1060error:
1061 rcu_read_unlock();
1062 return ret;
1063}
1064
1fdb9a78
JG
1065static
1066int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
1067 const char **addr)
1068{
1069 int ret;
1070 unsigned long mmap_offset;
1071 const char *mmap_base;
1072
1073 mmap_base = ustctl_get_mmap_base(stream->ustream);
1074 if (!mmap_base) {
1075 ERR("Failed to get mmap base for stream `%s`",
1076 stream->name);
1077 ret = -EPERM;
1078 goto error;
1079 }
1080
1081 ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
1082 if (ret != 0) {
1083 ERR("Failed to get mmap offset for stream `%s`", stream->name);
1084 ret = -EINVAL;
1085 goto error;
1086 }
1087
1088 *addr = mmap_base + mmap_offset;
1089error:
1090 return ret;
1091
1092}
1093
10a50311
JD
1094/*
1095 * Take a snapshot of all the stream of a channel.
1096 *
1097 * Returns 0 on success, < 0 on error
1098 */
1099static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
d07ceecd 1100 uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
10a50311
JD
1101{
1102 int ret;
1103 unsigned use_relayd = 0;
1104 unsigned long consumed_pos, produced_pos;
1105 struct lttng_consumer_channel *channel;
1106 struct lttng_consumer_stream *stream;
1107
1108 assert(path);
1109 assert(ctx);
1110
1111 rcu_read_lock();
1112
1113 if (relayd_id != (uint64_t) -1ULL) {
1114 use_relayd = 1;
1115 }
1116
1117 channel = consumer_find_channel(key);
1118 if (!channel) {
6a00837f 1119 ERR("UST snapshot channel not found for key %" PRIu64, key);
10a50311
JD
1120 ret = -1;
1121 goto error;
1122 }
1123 assert(!channel->monitor);
6a00837f 1124 DBG("UST consumer snapshot channel %" PRIu64, key);
10a50311
JD
1125
1126 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
9ce5646a
MD
1127 health_code_update();
1128
10a50311
JD
1129 /* Lock stream because we are about to change its state. */
1130 pthread_mutex_lock(&stream->lock);
6d40f8fa 1131 stream->relayd_id = relayd_id;
10a50311
JD
1132
1133 if (use_relayd) {
1134 ret = consumer_send_relayd_stream(stream, path);
1135 if (ret < 0) {
1136 goto error_unlock;
1137 }
1138 } else {
1139 ret = utils_create_stream_file(path, stream->name,
1140 stream->chan->tracefile_size,
1141 stream->tracefile_count_current,
309167d2 1142 stream->uid, stream->gid, NULL);
10a50311
JD
1143 if (ret < 0) {
1144 goto error_unlock;
1145 }
1146 stream->out_fd = ret;
1147 stream->tracefile_size_current = 0;
1148
1149 DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
1150 stream->name, stream->key);
1151 }
a4baae1b
JD
1152 if (relayd_id != -1ULL) {
1153 ret = consumer_send_relayd_streams_sent(relayd_id);
1154 if (ret < 0) {
1155 goto error_unlock;
1156 }
1157 }
10a50311 1158
d4d80f77
MD
1159 /*
1160 * If tracing is active, we want to perform a "full" buffer flush.
1161 * Else, if quiescent, it has already been done by the prior stop.
1162 */
1163 if (!stream->quiescent) {
1164 ustctl_flush_buffer(stream->ustream, 0);
1165 }
10a50311
JD
1166
1167 ret = lttng_ustconsumer_take_snapshot(stream);
1168 if (ret < 0) {
1169 ERR("Taking UST snapshot");
1170 goto error_unlock;
1171 }
1172
1173 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
1174 if (ret < 0) {
1175 ERR("Produced UST snapshot position");
1176 goto error_unlock;
1177 }
1178
1179 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
1180 if (ret < 0) {
1181 ERR("Consumerd UST snapshot position");
1182 goto error_unlock;
1183 }
1184
5c786ded
JD
1185 /*
1186 * The original value is sent back if max stream size is larger than
d07ceecd 1187 * the possible size of the snapshot. Also, we assume that the session
5c786ded
JD
1188 * daemon should never send a maximum stream size that is lower than
1189 * subbuffer size.
1190 */
d07ceecd
MD
1191 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
1192 produced_pos, nb_packets_per_stream,
1193 stream->max_sb_size);
5c786ded 1194
10a50311
JD
1195 while (consumed_pos < produced_pos) {
1196 ssize_t read_len;
1197 unsigned long len, padded_len;
1fdb9a78 1198 const char *subbuf_addr;
ace0e591 1199 struct lttng_buffer_view subbuf_view;
10a50311 1200
9ce5646a
MD
1201 health_code_update();
1202
10a50311
JD
1203 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
1204
1205 ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
1206 if (ret < 0) {
1207 if (ret != -EAGAIN) {
1208 PERROR("ustctl_get_subbuf snapshot");
1209 goto error_close_stream;
1210 }
1211 DBG("UST consumer get subbuf failed. Skipping it.");
1212 consumed_pos += stream->max_sb_size;
6e1f2e92 1213 stream->chan->lost_packets++;
10a50311
JD
1214 continue;
1215 }
1216
1217 ret = ustctl_get_subbuf_size(stream->ustream, &len);
1218 if (ret < 0) {
1219 ERR("Snapshot ustctl_get_subbuf_size");
1220 goto error_put_subbuf;
1221 }
1222
1223 ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
1224 if (ret < 0) {
1225 ERR("Snapshot ustctl_get_padded_subbuf_size");
1226 goto error_put_subbuf;
1227 }
1228
1fdb9a78
JG
1229 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1230 if (ret) {
1231 goto error_put_subbuf;
1232 }
1233
ace0e591
JG
1234 subbuf_view = lttng_buffer_view_init(
1235 subbuf_addr, 0, padded_len);
d6ef77b3 1236 read_len = lttng_consumer_on_read_subbuffer_mmap(
29d1a7ae 1237 stream, &subbuf_view, padded_len - len);
10a50311
JD
1238 if (use_relayd) {
1239 if (read_len != len) {
56591bac 1240 ret = -EPERM;
10a50311
JD
1241 goto error_put_subbuf;
1242 }
1243 } else {
1244 if (read_len != padded_len) {
56591bac 1245 ret = -EPERM;
10a50311
JD
1246 goto error_put_subbuf;
1247 }
1248 }
1249
1250 ret = ustctl_put_subbuf(stream->ustream);
1251 if (ret < 0) {
1252 ERR("Snapshot ustctl_put_subbuf");
1253 goto error_close_stream;
1254 }
1255 consumed_pos += stream->max_sb_size;
1256 }
1257
1258 /* Simply close the stream so we can use it on the next snapshot. */
1259 consumer_stream_close(stream);
1260 pthread_mutex_unlock(&stream->lock);
1261 }
1262
1263 rcu_read_unlock();
1264 return 0;
1265
1266error_put_subbuf:
1267 if (ustctl_put_subbuf(stream->ustream) < 0) {
1268 ERR("Snapshot ustctl_put_subbuf");
1269 }
1270error_close_stream:
1271 consumer_stream_close(stream);
1272error_unlock:
1273 pthread_mutex_unlock(&stream->lock);
1274error:
1275 rcu_read_unlock();
d88aee68
DG
1276 return ret;
1277}
1278
331744e3 1279/*
c585821b
MD
1280 * Receive the metadata updates from the sessiond. Supports receiving
1281 * overlapping metadata, but is needs to always belong to a contiguous
1282 * range starting from 0.
1283 * Be careful about the locks held when calling this function: it needs
1284 * the metadata cache flush to concurrently progress in order to
1285 * complete.
331744e3
JD
1286 */
1287int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
93ec662e
JD
1288 uint64_t len, uint64_t version,
1289 struct lttng_consumer_channel *channel, int timer, int wait)
331744e3 1290{
0c759fc9 1291 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
331744e3
JD
1292 char *metadata_str;
1293
8fd623e0 1294 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
331744e3
JD
1295
1296 metadata_str = zmalloc(len * sizeof(char));
1297 if (!metadata_str) {
1298 PERROR("zmalloc metadata string");
1299 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1300 goto end;
1301 }
1302
9ce5646a
MD
1303 health_code_update();
1304
331744e3
JD
1305 /* Receive metadata string. */
1306 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1307 if (ret < 0) {
1308 /* Session daemon is dead so return gracefully. */
1309 ret_code = ret;
1310 goto end_free;
1311 }
1312
9ce5646a
MD
1313 health_code_update();
1314
331744e3 1315 pthread_mutex_lock(&channel->metadata_cache->lock);
93ec662e
JD
1316 ret = consumer_metadata_cache_write(channel, offset, len, version,
1317 metadata_str);
331744e3
JD
1318 if (ret < 0) {
1319 /* Unable to handle metadata. Notify session daemon. */
1320 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
a32bd775
DG
1321 /*
1322 * Skip metadata flush on write error since the offset and len might
1323 * not have been updated which could create an infinite loop below when
1324 * waiting for the metadata cache to be flushed.
1325 */
1326 pthread_mutex_unlock(&channel->metadata_cache->lock);
a32bd775 1327 goto end_free;
331744e3
JD
1328 }
1329 pthread_mutex_unlock(&channel->metadata_cache->lock);
1330
94d49140
JD
1331 if (!wait) {
1332 goto end_free;
1333 }
5e41ebe1 1334 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
331744e3 1335 DBG("Waiting for metadata to be flushed");
9ce5646a
MD
1336
1337 health_code_update();
1338
331744e3
JD
1339 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1340 }
1341
1342end_free:
1343 free(metadata_str);
1344end:
1345 return ret_code;
1346}
1347
4cbc1a04
DG
1348/*
1349 * Receive command from session daemon and process it.
1350 *
1351 * Return 1 on success else a negative value or 0.
1352 */
3bd1e081
MD
1353int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1354 int sock, struct pollfd *consumer_sockpoll)
1355{
1356 ssize_t ret;
0c759fc9 1357 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081 1358 struct lttcomm_consumer_msg msg;
ffe60014 1359 struct lttng_consumer_channel *channel = NULL;
3bd1e081 1360
9ce5646a
MD
1361 health_code_update();
1362
3bd1e081
MD
1363 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1364 if (ret != sizeof(msg)) {
173af62f
DG
1365 DBG("Consumer received unexpected message size %zd (expects %zu)",
1366 ret, sizeof(msg));
3be74084
DG
1367 /*
1368 * The ret value might 0 meaning an orderly shutdown but this is ok
1369 * since the caller handles this.
1370 */
489f70e9 1371 if (ret > 0) {
c6857fcf 1372 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
489f70e9
MD
1373 ret = -1;
1374 }
3bd1e081
MD
1375 return ret;
1376 }
9ce5646a
MD
1377
1378 health_code_update();
1379
84382d49
MD
1380 /* deprecated */
1381 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 1382
9ce5646a
MD
1383 health_code_update();
1384
3f8e211f 1385 /* relayd needs RCU read-side lock */
b0b335c8
MD
1386 rcu_read_lock();
1387
3bd1e081 1388 switch (msg.cmd_type) {
00e2e675
DG
1389 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1390 {
f50f23d9 1391 /* Session daemon status message are handled in the following call. */
028ba707 1392 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
7735ef9e 1393 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
d3e2ba59
JD
1394 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
1395 msg.u.relayd_sock.relayd_session_id);
00e2e675
DG
1396 goto end_nosignal;
1397 }
173af62f
DG
1398 case LTTNG_CONSUMER_DESTROY_RELAYD:
1399 {
a6ba4fe1 1400 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
1401 struct consumer_relayd_sock_pair *relayd;
1402
a6ba4fe1 1403 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
1404
1405 /* Get relayd reference if exists. */
a6ba4fe1 1406 relayd = consumer_find_relayd(index);
173af62f 1407 if (relayd == NULL) {
3448e266 1408 DBG("Unable to find relayd %" PRIu64, index);
e462382a 1409 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
173af62f
DG
1410 }
1411
a6ba4fe1
DG
1412 /*
1413 * Each relayd socket pair has a refcount of stream attached to it
1414 * which tells if the relayd is still active or not depending on the
1415 * refcount value.
1416 *
1417 * This will set the destroy flag of the relayd object and destroy it
1418 * if the refcount reaches zero when called.
1419 *
1420 * The destroy can happen either here or when a stream fd hangs up.
1421 */
f50f23d9
DG
1422 if (relayd) {
1423 consumer_flag_relayd_for_destroy(relayd);
1424 }
1425
d88aee68 1426 goto end_msg_sessiond;
173af62f 1427 }
3bd1e081
MD
1428 case LTTNG_CONSUMER_UPDATE_STREAM:
1429 {
3f8e211f 1430 rcu_read_unlock();
7ad0a0cb 1431 return -ENOSYS;
3bd1e081 1432 }
6d805429 1433 case LTTNG_CONSUMER_DATA_PENDING:
53632229 1434 {
3be74084 1435 int ret, is_data_pending;
6d805429 1436 uint64_t id = msg.u.data_pending.session_id;
ca22feea 1437
6d805429 1438 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 1439
3be74084 1440 is_data_pending = consumer_data_pending(id);
ca22feea
DG
1441
1442 /* Send back returned value to session daemon */
3be74084
DG
1443 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
1444 sizeof(is_data_pending));
ca22feea 1445 if (ret < 0) {
3be74084 1446 DBG("Error when sending the data pending ret code: %d", ret);
489f70e9 1447 goto error_fatal;
ca22feea 1448 }
f50f23d9
DG
1449
1450 /*
1451 * No need to send back a status message since the data pending
1452 * returned value is the response.
1453 */
ca22feea 1454 break;
53632229 1455 }
ffe60014
DG
1456 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1457 {
1458 int ret;
1459 struct ustctl_consumer_channel_attr attr;
1460
1461 /* Create a plain object and reserve a channel key. */
11785f65
JG
1462 channel = consumer_allocate_channel(
1463 msg.u.ask_channel.key,
1464 msg.u.ask_channel.session_id,
1465 msg.u.ask_channel.pathname,
1466 msg.u.ask_channel.name,
1467 msg.u.ask_channel.uid,
1468 msg.u.ask_channel.gid,
1469 msg.u.ask_channel.relayd_id,
1624d5b7
JD
1470 (enum lttng_event_output) msg.u.ask_channel.output,
1471 msg.u.ask_channel.tracefile_size,
2bba9e53 1472 msg.u.ask_channel.tracefile_count,
1950109e 1473 msg.u.ask_channel.session_id_per_pid,
ecc48a90 1474 msg.u.ask_channel.monitor,
d7ba1388 1475 msg.u.ask_channel.live_timer_interval,
11785f65 1476 msg.u.ask_channel.is_live,
3d071855 1477 msg.u.ask_channel.root_shm_path,
d7ba1388 1478 msg.u.ask_channel.shm_path);
ffe60014
DG
1479 if (!channel) {
1480 goto end_channel_error;
1481 }
1482
567eb353
DG
1483 /*
1484 * Assign UST application UID to the channel. This value is ignored for
1485 * per PID buffers. This is specific to UST thus setting this after the
1486 * allocation.
1487 */
1488 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1489
ffe60014
DG
1490 /* Build channel attributes from received message. */
1491 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1492 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1493 attr.overwrite = msg.u.ask_channel.overwrite;
1494 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1495 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
7972aab2 1496 attr.chan_id = msg.u.ask_channel.chan_id;
ffe60014
DG
1497 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
1498
0c759fc9
DG
1499 /* Match channel buffer type to the UST abi. */
1500 switch (msg.u.ask_channel.output) {
1501 case LTTNG_EVENT_MMAP:
1502 default:
1503 attr.output = LTTNG_UST_MMAP;
1504 break;
1505 }
1506
ffe60014
DG
1507 /* Translate and save channel type. */
1508 switch (msg.u.ask_channel.type) {
1509 case LTTNG_UST_CHAN_PER_CPU:
1510 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
1511 attr.type = LTTNG_UST_CHAN_PER_CPU;
8633d6e3
MD
1512 /*
1513 * Set refcount to 1 for owner. Below, we will
1514 * pass ownership to the
1515 * consumer_thread_channel_poll() thread.
1516 */
1517 channel->refcount = 1;
ffe60014
DG
1518 break;
1519 case LTTNG_UST_CHAN_METADATA:
1520 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
1521 attr.type = LTTNG_UST_CHAN_METADATA;
1522 break;
1523 default:
1524 assert(0);
1525 goto error_fatal;
1526 };
1527
9ce5646a
MD
1528 health_code_update();
1529
ffe60014
DG
1530 ret = ask_channel(ctx, sock, channel, &attr);
1531 if (ret < 0) {
1532 goto end_channel_error;
1533 }
1534
fc643247
MD
1535 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1536 ret = consumer_metadata_cache_allocate(channel);
1537 if (ret < 0) {
1538 ERR("Allocating metadata cache");
1539 goto end_channel_error;
1540 }
1541 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1542 attr.switch_timer_interval = 0;
94d49140
JD
1543 } else {
1544 consumer_timer_live_start(channel,
1545 msg.u.ask_channel.live_timer_interval);
fc643247
MD
1546 }
1547
9ce5646a
MD
1548 health_code_update();
1549
ffe60014
DG
1550 /*
1551 * Add the channel to the internal state AFTER all streams were created
1552 * and successfully sent to session daemon. This way, all streams must
1553 * be ready before this channel is visible to the threads.
fc643247
MD
1554 * If add_channel succeeds, ownership of the channel is
1555 * passed to consumer_thread_channel_poll().
ffe60014
DG
1556 */
1557 ret = add_channel(channel, ctx);
1558 if (ret < 0) {
ea88ca2a
MD
1559 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1560 if (channel->switch_timer_enabled == 1) {
1561 consumer_timer_switch_stop(channel);
1562 }
1563 consumer_metadata_cache_destroy(channel);
1564 }
d3e2ba59
JD
1565 if (channel->live_timer_enabled == 1) {
1566 consumer_timer_live_stop(channel);
1567 }
ffe60014
DG
1568 goto end_channel_error;
1569 }
1570
9ce5646a
MD
1571 health_code_update();
1572
ffe60014
DG
1573 /*
1574 * Channel and streams are now created. Inform the session daemon that
1575 * everything went well and should wait to receive the channel and
1576 * streams with ustctl API.
1577 */
1578 ret = consumer_send_status_channel(sock, channel);
1579 if (ret < 0) {
1580 /*
489f70e9 1581 * There is probably a problem on the socket.
ffe60014 1582 */
489f70e9 1583 goto error_fatal;
ffe60014
DG
1584 }
1585
1586 break;
1587 }
1588 case LTTNG_CONSUMER_GET_CHANNEL:
1589 {
1590 int ret, relayd_err = 0;
d88aee68 1591 uint64_t key = msg.u.get_channel.key;
ffe60014 1592 struct lttng_consumer_channel *channel;
ffe60014
DG
1593
1594 channel = consumer_find_channel(key);
1595 if (!channel) {
8fd623e0 1596 ERR("UST consumer get channel key %" PRIu64 " not found", key);
e462382a 1597 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
1598 goto end_msg_sessiond;
1599 }
1600
9ce5646a
MD
1601 health_code_update();
1602
ffe60014
DG
1603 /* Send everything to sessiond. */
1604 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
1605 if (ret < 0) {
1606 if (relayd_err) {
1607 /*
1608 * We were unable to send to the relayd the stream so avoid
1609 * sending back a fatal error to the thread since this is OK
f2a444f1
DG
1610 * and the consumer can continue its work. The above call
1611 * has sent the error status message to the sessiond.
ffe60014 1612 */
f2a444f1 1613 goto end_nosignal;
ffe60014
DG
1614 }
1615 /*
1616 * The communicaton was broken hence there is a bad state between
1617 * the consumer and sessiond so stop everything.
1618 */
1619 goto error_fatal;
1620 }
1621
9ce5646a
MD
1622 health_code_update();
1623
10a50311
JD
1624 /*
1625 * In no monitor mode, the streams ownership is kept inside the channel
1626 * so don't send them to the data thread.
1627 */
1628 if (!channel->monitor) {
1629 goto end_msg_sessiond;
1630 }
1631
d88aee68
DG
1632 ret = send_streams_to_thread(channel, ctx);
1633 if (ret < 0) {
1634 /*
1635 * If we are unable to send the stream to the thread, there is
1636 * a big problem so just stop everything.
1637 */
1638 goto error_fatal;
ffe60014 1639 }
ffe60014
DG
1640 /* List MUST be empty after or else it could be reused. */
1641 assert(cds_list_empty(&channel->streams.head));
d88aee68
DG
1642 goto end_msg_sessiond;
1643 }
d60962b2
JR
1644 case LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER:
1645 {
1646 uint64_t key = msg.u.get_channel.key;
1647 struct lttng_consumer_channel *channel;
1648
1649 channel = consumer_find_channel(key);
1650 if (!channel) {
1651 ERR("UST consumer get channel key %" PRIu64 " not found", key);
1652 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1653 goto end_msg_sessiond;
1654 }
1655
1656 health_code_update();
1657
1658 if (channel->live_timer_enabled == 1) {
1659 consumer_timer_live_stop(channel);
1660 }
1661
1662 health_code_update();
1663
1664 goto end_msg_sessiond;
1665 }
1666 case LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER:
1667 {
1668 uint64_t key = msg.u.get_channel.key;
1669 struct lttng_consumer_channel *channel;
1670
1671 channel = consumer_find_channel(key);
1672 if (!channel) {
1673 ERR("UST consumer get channel key %" PRIu64 " not found", key);
1674 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1675 goto end_msg_sessiond;
1676 }
1677
1678 health_code_update();
1679
1680 if (channel->live_timer_enabled == 0) {
1681 consumer_timer_live_start(channel, channel->live_timer_interval);
1682 }
1683
1684 health_code_update();
1685
1686 goto end_msg_sessiond;
1687 }
1688
d88aee68
DG
1689 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1690 {
1691 uint64_t key = msg.u.destroy_channel.key;
d88aee68 1692
a0cbdd2e
MD
1693 /*
1694 * Only called if streams have not been sent to stream
1695 * manager thread. However, channel has been sent to
1696 * channel manager thread.
1697 */
1698 notify_thread_del_channel(ctx, key);
d88aee68 1699 goto end_msg_sessiond;
ffe60014 1700 }
d88aee68
DG
1701 case LTTNG_CONSUMER_CLOSE_METADATA:
1702 {
1703 int ret;
1704
1705 ret = close_metadata(msg.u.close_metadata.key);
1706 if (ret != 0) {
1707 ret_code = ret;
1708 }
1709
1710 goto end_msg_sessiond;
1711 }
7972aab2
DG
1712 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1713 {
1714 int ret;
1715
1716 ret = flush_channel(msg.u.flush_channel.key);
1717 if (ret != 0) {
1718 ret_code = ret;
1719 }
1720
1721 goto end_msg_sessiond;
1722 }
0dd01979
MD
1723 case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
1724 {
1725 int ret;
1726
1727 ret = clear_quiescent_channel(
1728 msg.u.clear_quiescent_channel.key);
1729 if (ret != 0) {
1730 ret_code = ret;
1731 }
1732
1733 goto end_msg_sessiond;
1734 }
d88aee68 1735 case LTTNG_CONSUMER_PUSH_METADATA:
ffe60014
DG
1736 {
1737 int ret;
d88aee68 1738 uint64_t len = msg.u.push_metadata.len;
d88aee68 1739 uint64_t key = msg.u.push_metadata.key;
331744e3 1740 uint64_t offset = msg.u.push_metadata.target_offset;
93ec662e 1741 uint64_t version = msg.u.push_metadata.version;
ffe60014
DG
1742 struct lttng_consumer_channel *channel;
1743
8fd623e0
DG
1744 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
1745 len);
ffe60014
DG
1746
1747 channel = consumer_find_channel(key);
1748 if (!channel) {
000baf6a
DG
1749 /*
1750 * This is possible if the metadata creation on the consumer side
1751 * is in flight vis-a-vis a concurrent push metadata from the
1752 * session daemon. Simply return that the channel failed and the
1753 * session daemon will handle that message correctly considering
1754 * that this race is acceptable thus the DBG() statement here.
1755 */
1756 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1757 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
4a2eb0ca 1758 goto end_msg_sessiond;
d88aee68
DG
1759 }
1760
9ce5646a
MD
1761 health_code_update();
1762
c585821b
MD
1763 if (!len) {
1764 /*
1765 * There is nothing to receive. We have simply
1766 * checked whether the channel can be found.
1767 */
1768 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1769 goto end_msg_sessiond;
1770 }
1771
d88aee68 1772 /* Tell session daemon we are ready to receive the metadata. */
0c759fc9 1773 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
ffe60014
DG
1774 if (ret < 0) {
1775 /* Somehow, the session daemon is not responding anymore. */
d88aee68
DG
1776 goto error_fatal;
1777 }
1778
9ce5646a
MD
1779 health_code_update();
1780
d88aee68 1781 /* Wait for more data. */
9ce5646a
MD
1782 health_poll_entry();
1783 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1784 health_poll_exit();
84382d49 1785 if (ret) {
489f70e9 1786 goto error_fatal;
d88aee68
DG
1787 }
1788
9ce5646a
MD
1789 health_code_update();
1790
331744e3 1791 ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
93ec662e 1792 len, version, channel, 0, 1);
d88aee68 1793 if (ret < 0) {
331744e3 1794 /* error receiving from sessiond */
489f70e9 1795 goto error_fatal;
331744e3
JD
1796 } else {
1797 ret_code = ret;
d88aee68
DG
1798 goto end_msg_sessiond;
1799 }
d88aee68
DG
1800 }
1801 case LTTNG_CONSUMER_SETUP_METADATA:
1802 {
1803 int ret;
1804
1805 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1806 if (ret) {
1807 ret_code = ret;
1808 }
1809 goto end_msg_sessiond;
ffe60014 1810 }
6dc3064a
DG
1811 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1812 {
10a50311
JD
1813 if (msg.u.snapshot_channel.metadata) {
1814 ret = snapshot_metadata(msg.u.snapshot_channel.key,
1815 msg.u.snapshot_channel.pathname,
1816 msg.u.snapshot_channel.relayd_id,
1817 ctx);
1818 if (ret < 0) {
1819 ERR("Snapshot metadata failed");
e462382a 1820 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
10a50311
JD
1821 }
1822 } else {
1823 ret = snapshot_channel(msg.u.snapshot_channel.key,
1824 msg.u.snapshot_channel.pathname,
1825 msg.u.snapshot_channel.relayd_id,
d07ceecd 1826 msg.u.snapshot_channel.nb_packets_per_stream,
10a50311
JD
1827 ctx);
1828 if (ret < 0) {
1829 ERR("Snapshot channel failed");
e462382a 1830 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
10a50311
JD
1831 }
1832 }
1833
9ce5646a 1834 health_code_update();
6dc3064a
DG
1835 ret = consumer_send_status_msg(sock, ret_code);
1836 if (ret < 0) {
1837 /* Somehow, the session daemon is not responding anymore. */
1838 goto end_nosignal;
1839 }
9ce5646a 1840 health_code_update();
6dc3064a
DG
1841 break;
1842 }
fb83fe64
JD
1843 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1844 {
beb59458
MJ
1845 int ret = 0;
1846 uint64_t discarded_events;
fb83fe64
JD
1847 struct lttng_ht_iter iter;
1848 struct lttng_ht *ht;
1849 struct lttng_consumer_stream *stream;
1850 uint64_t id = msg.u.discarded_events.session_id;
1851 uint64_t key = msg.u.discarded_events.channel_key;
1852
1853 DBG("UST consumer discarded events command for session id %"
1854 PRIu64, id);
1855 rcu_read_lock();
1856 pthread_mutex_lock(&consumer_data.lock);
1857
1858 ht = consumer_data.stream_list_ht;
1859
1860 /*
1861 * We only need a reference to the channel, but they are not
1862 * directly indexed, so we just use the first matching stream
1863 * to extract the information we need, we default to 0 if not
1864 * found (no events are dropped if the channel is not yet in
1865 * use).
1866 */
beb59458 1867 discarded_events = 0;
fb83fe64
JD
1868 cds_lfht_for_each_entry_duplicate(ht->ht,
1869 ht->hash_fct(&id, lttng_ht_seed),
1870 ht->match_fct, &id,
1871 &iter.iter, stream, node_session_id.node) {
1872 if (stream->chan->key == key) {
beb59458 1873 discarded_events = stream->chan->discarded_events;
fb83fe64
JD
1874 break;
1875 }
1876 }
1877 pthread_mutex_unlock(&consumer_data.lock);
1878 rcu_read_unlock();
1879
1880 DBG("UST consumer discarded events command for session id %"
1881 PRIu64 ", channel key %" PRIu64, id, key);
1882
1883 health_code_update();
1884
1885 /* Send back returned value to session daemon */
beb59458 1886 ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
fb83fe64
JD
1887 if (ret < 0) {
1888 PERROR("send discarded events");
1889 goto error_fatal;
1890 }
1891
1892 break;
1893 }
1894 case LTTNG_CONSUMER_LOST_PACKETS:
1895 {
9a06e8d4
JG
1896 int ret;
1897 uint64_t lost_packets;
fb83fe64
JD
1898 struct lttng_ht_iter iter;
1899 struct lttng_ht *ht;
1900 struct lttng_consumer_stream *stream;
1901 uint64_t id = msg.u.lost_packets.session_id;
1902 uint64_t key = msg.u.lost_packets.channel_key;
1903
1904 DBG("UST consumer lost packets command for session id %"
1905 PRIu64, id);
1906 rcu_read_lock();
1907 pthread_mutex_lock(&consumer_data.lock);
1908
1909 ht = consumer_data.stream_list_ht;
1910
1911 /*
1912 * We only need a reference to the channel, but they are not
1913 * directly indexed, so we just use the first matching stream
1914 * to extract the information we need, we default to 0 if not
1915 * found (no packets lost if the channel is not yet in use).
1916 */
9a06e8d4 1917 lost_packets = 0;
fb83fe64
JD
1918 cds_lfht_for_each_entry_duplicate(ht->ht,
1919 ht->hash_fct(&id, lttng_ht_seed),
1920 ht->match_fct, &id,
1921 &iter.iter, stream, node_session_id.node) {
1922 if (stream->chan->key == key) {
9a06e8d4 1923 lost_packets = stream->chan->lost_packets;
fb83fe64
JD
1924 break;
1925 }
1926 }
1927 pthread_mutex_unlock(&consumer_data.lock);
1928 rcu_read_unlock();
1929
1930 DBG("UST consumer lost packets command for session id %"
1931 PRIu64 ", channel key %" PRIu64, id, key);
1932
1933 health_code_update();
1934
1935 /* Send back returned value to session daemon */
9a06e8d4
JG
1936 ret = lttcomm_send_unix_sock(sock, &lost_packets,
1937 sizeof(lost_packets));
fb83fe64
JD
1938 if (ret < 0) {
1939 PERROR("send lost packets");
1940 goto error_fatal;
1941 }
1942
1943 break;
1944 }
3bd1e081
MD
1945 default:
1946 break;
1947 }
3f8e211f 1948
3bd1e081 1949end_nosignal:
b0b335c8 1950 rcu_read_unlock();
4cbc1a04 1951
9ce5646a
MD
1952 health_code_update();
1953
4cbc1a04
DG
1954 /*
1955 * Return 1 to indicate success since the 0 value can be a socket
1956 * shutdown during the recv() or send() call.
1957 */
1958 return 1;
ffe60014
DG
1959
1960end_msg_sessiond:
1961 /*
1962 * The returned value here is not useful since either way we'll return 1 to
1963 * the caller because the session daemon socket management is done
1964 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1965 */
489f70e9
MD
1966 ret = consumer_send_status_msg(sock, ret_code);
1967 if (ret < 0) {
1968 goto error_fatal;
1969 }
ffe60014 1970 rcu_read_unlock();
9ce5646a
MD
1971
1972 health_code_update();
1973
ffe60014
DG
1974 return 1;
1975end_channel_error:
1976 if (channel) {
1977 /*
1978 * Free channel here since no one has a reference to it. We don't
1979 * free after that because a stream can store this pointer.
1980 */
1981 destroy_channel(channel);
1982 }
1983 /* We have to send a status channel message indicating an error. */
1984 ret = consumer_send_status_channel(sock, NULL);
1985 if (ret < 0) {
1986 /* Stop everything if session daemon can not be notified. */
1987 goto error_fatal;
1988 }
1989 rcu_read_unlock();
9ce5646a
MD
1990
1991 health_code_update();
1992
ffe60014
DG
1993 return 1;
1994error_fatal:
1995 rcu_read_unlock();
1996 /* This will issue a consumer stop. */
1997 return -1;
3bd1e081
MD
1998}
1999
1fdb9a78
JG
2000void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
2001 int producer_active)
3bd1e081 2002{
ffe60014
DG
2003 assert(stream);
2004 assert(stream->ustream);
b5c5fc29 2005
1fdb9a78 2006 ustctl_flush_buffer(stream->ustream, producer_active);
d056b477
MD
2007}
2008
ffe60014
DG
2009/*
2010 * Take a snapshot for a specific fd
2011 *
2012 * Returns 0 on success, < 0 on error
2013 */
2014int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 2015{
ffe60014
DG
2016 assert(stream);
2017 assert(stream->ustream);
2018
2019 return ustctl_snapshot(stream->ustream);
3bd1e081
MD
2020}
2021
ffe60014
DG
2022/*
2023 * Get the produced position
2024 *
2025 * Returns 0 on success, < 0 on error
2026 */
2027int lttng_ustconsumer_get_produced_snapshot(
2028 struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 2029{
ffe60014
DG
2030 assert(stream);
2031 assert(stream->ustream);
2032 assert(pos);
7a57cf92 2033
ffe60014
DG
2034 return ustctl_snapshot_get_produced(stream->ustream, pos);
2035}
7a57cf92 2036
10a50311
JD
2037/*
2038 * Get the consumed position
2039 *
2040 * Returns 0 on success, < 0 on error
2041 */
2042int lttng_ustconsumer_get_consumed_snapshot(
2043 struct lttng_consumer_stream *stream, unsigned long *pos)
2044{
2045 assert(stream);
2046 assert(stream->ustream);
2047 assert(pos);
2048
2049 return ustctl_snapshot_get_consumed(stream->ustream, pos);
2050}
2051
84a182ce
DG
2052void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
2053 int producer)
2054{
2055 assert(stream);
2056 assert(stream->ustream);
2057
2058 ustctl_flush_buffer(stream->ustream, producer);
2059}
2060
2061int lttng_ustconsumer_get_current_timestamp(
2062 struct lttng_consumer_stream *stream, uint64_t *ts)
2063{
2064 assert(stream);
2065 assert(stream->ustream);
2066 assert(ts);
2067
2068 return ustctl_get_current_timestamp(stream->ustream, ts);
2069}
2070
fb83fe64
JD
2071int lttng_ustconsumer_get_sequence_number(
2072 struct lttng_consumer_stream *stream, uint64_t *seq)
2073{
2074 assert(stream);
2075 assert(stream->ustream);
2076 assert(seq);
2077
2078 return ustctl_get_sequence_number(stream->ustream, seq);
2079}
2080
ffe60014 2081/*
0dd01979 2082 * Called when the stream signals the consumer that it has hung up.
ffe60014
DG
2083 */
2084void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
2085{
2086 assert(stream);
2087 assert(stream->ustream);
2c1dd183 2088
0dd01979
MD
2089 pthread_mutex_lock(&stream->lock);
2090 if (!stream->quiescent) {
2091 ustctl_flush_buffer(stream->ustream, 0);
2092 stream->quiescent = true;
2093 }
2094 pthread_mutex_unlock(&stream->lock);
ffe60014
DG
2095 stream->hangup_flush_done = 1;
2096}
ee77a7b0 2097
ffe60014
DG
2098void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
2099{
4628484a
MD
2100 int i;
2101
ffe60014
DG
2102 assert(chan);
2103 assert(chan->uchan);
e316aad5 2104
ea88ca2a
MD
2105 if (chan->switch_timer_enabled == 1) {
2106 consumer_timer_switch_stop(chan);
2107 }
4628484a
MD
2108 for (i = 0; i < chan->nr_stream_fds; i++) {
2109 int ret;
2110
2111 ret = close(chan->stream_fds[i]);
2112 if (ret) {
2113 PERROR("close");
2114 }
2115 if (chan->shm_path[0]) {
2116 char shm_path[PATH_MAX];
2117
2118 ret = get_stream_shm_path(shm_path, chan->shm_path, i);
2119 if (ret) {
2120 ERR("Cannot get stream shm path");
2121 }
2122 ret = run_as_unlink(shm_path, chan->uid, chan->gid);
2123 if (ret) {
4628484a
MD
2124 PERROR("unlink %s", shm_path);
2125 }
2126 }
2127 }
3bd1e081
MD
2128}
2129
b83e03c4
MD
2130void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
2131{
2132 assert(chan);
2133 assert(chan->uchan);
2134
2135 consumer_metadata_cache_destroy(chan);
2136 ustctl_destroy_channel(chan->uchan);
ea853771
JR
2137 /* Try to rmdir all directories under shm_path root. */
2138 if (chan->root_shm_path[0]) {
2139 (void) run_as_recursive_rmdir(chan->root_shm_path,
2140 chan->uid, chan->gid);
2141 }
b83e03c4
MD
2142 free(chan->stream_fds);
2143}
2144
3bd1e081
MD
2145void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
2146{
ffe60014
DG
2147 assert(stream);
2148 assert(stream->ustream);
d41f73b7 2149
ea88ca2a
MD
2150 if (stream->chan->switch_timer_enabled == 1) {
2151 consumer_timer_switch_stop(stream->chan);
2152 }
ffe60014
DG
2153 ustctl_destroy_stream(stream->ustream);
2154}
d41f73b7 2155
6d574024
DG
2156int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
2157{
2158 assert(stream);
2159 assert(stream->ustream);
2160
2161 return ustctl_stream_get_wakeup_fd(stream->ustream);
2162}
2163
2164int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
2165{
2166 assert(stream);
2167 assert(stream->ustream);
2168
2169 return ustctl_stream_close_wakeup_fd(stream->ustream);
2170}
2171
93ec662e 2172static
3910d1ea
JG
2173void metadata_stream_reset_cache_consumed_position(
2174 struct lttng_consumer_stream *stream)
93ec662e 2175{
29d1a7ae
JG
2176 DBG("Reset metadata cache of session %" PRIu64,
2177 stream->chan->session_id);
93ec662e 2178 stream->ust_metadata_pushed = 0;
93ec662e
JD
2179}
2180
94d49140
JD
2181/*
2182 * Write up to one packet from the metadata cache to the channel.
2183 *
2184 * Returns the number of bytes pushed in the cache, or a negative value
2185 * on error.
2186 */
2187static
2188int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
2189{
2190 ssize_t write_len;
2191 int ret;
2192
2193 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
3910d1ea
JG
2194 if (stream->chan->metadata_cache->max_offset ==
2195 stream->ust_metadata_pushed) {
2196 /*
2197 * In the context of a user space metadata channel, a
2198 * change in version can be detected in two ways:
2199 * 1) During the pre-consume of the `read_subbuffer` loop,
2200 * 2) When populating the metadata ring buffer (i.e. here).
2201 *
2202 * This function is invoked when there is no metadata
2203 * available in the ring-buffer. If all data was consumed
2204 * up to the size of the metadata cache, there is no metadata
2205 * to insert in the ring-buffer.
2206 *
2207 * However, the metadata version could still have changed (a
2208 * regeneration without any new data will yield the same cache
2209 * size).
2210 *
2211 * The cache's version is checked for a version change and the
2212 * consumed position is reset if one occurred.
2213 *
2214 * This check is only necessary for the user space domain as
2215 * it has to manage the cache explicitly. If this reset was not
2216 * performed, no metadata would be consumed (and no reset would
2217 * occur as part of the pre-consume) until the metadata size
2218 * exceeded the cache size.
2219 */
2220 if (stream->metadata_version !=
2221 stream->chan->metadata_cache->version) {
2222 metadata_stream_reset_cache_consumed_position(stream);
2223 consumer_stream_metadata_set_version(stream,
2224 stream->chan->metadata_cache->version);
2225 } else {
2226 ret = 0;
2227 goto end;
2228 }
94d49140
JD
2229 }
2230
2231 write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
2232 &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
c585821b 2233 stream->chan->metadata_cache->max_offset
94d49140
JD
2234 - stream->ust_metadata_pushed);
2235 assert(write_len != 0);
2236 if (write_len < 0) {
2237 ERR("Writing one metadata packet");
d6ef77b3 2238 ret = write_len;
94d49140
JD
2239 goto end;
2240 }
2241 stream->ust_metadata_pushed += write_len;
2242
c585821b 2243 assert(stream->chan->metadata_cache->max_offset >=
94d49140
JD
2244 stream->ust_metadata_pushed);
2245 ret = write_len;
2246
29d1a7ae
JG
2247 /*
2248 * Switch packet (but don't open the next one) on every commit of
2249 * a metadata packet. Since the subbuffer is fully filled (with padding,
2250 * if needed), the stream is "quiescent" after this commit.
2251 */
2252 ustctl_flush_buffer(stream->ustream, 1);
94d49140
JD
2253end:
2254 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
2255 return ret;
2256}
2257
309167d2 2258
94d49140
JD
2259/*
2260 * Sync metadata meaning request them to the session daemon and snapshot to the
2261 * metadata thread can consumer them.
2262 *
c585821b
MD
2263 * Metadata stream lock is held here, but we need to release it when
2264 * interacting with sessiond, else we cause a deadlock with live
2265 * awaiting on metadata to be pushed out.
94d49140
JD
2266 *
2267 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
2268 * is empty or a negative value on error.
2269 */
2270int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
2271 struct lttng_consumer_stream *metadata)
2272{
2273 int ret;
2274 int retry = 0;
2275
2276 assert(ctx);
2277 assert(metadata);
2278
c585821b 2279 pthread_mutex_unlock(&metadata->lock);
94d49140
JD
2280 /*
2281 * Request metadata from the sessiond, but don't wait for the flush
2282 * because we locked the metadata thread.
2283 */
2284 ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
87f05398 2285 pthread_mutex_lock(&metadata->lock);
94d49140
JD
2286 if (ret < 0) {
2287 goto end;
2288 }
2289
2290 ret = commit_one_metadata_packet(metadata);
2291 if (ret <= 0) {
2292 goto end;
2293 } else if (ret > 0) {
2294 retry = 1;
2295 }
2296
2297 ustctl_flush_buffer(metadata->ustream, 1);
2298 ret = ustctl_snapshot(metadata->ustream);
2299 if (ret < 0) {
2300 if (errno != EAGAIN) {
2301 ERR("Sync metadata, taking UST snapshot");
2302 goto end;
2303 }
2304 DBG("No new metadata when syncing them.");
2305 /* No new metadata, exit. */
2306 ret = ENODATA;
2307 goto end;
2308 }
2309
2310 /*
2311 * After this flush, we still need to extract metadata.
2312 */
2313 if (retry) {
2314 ret = EAGAIN;
2315 }
2316
2317end:
2318 return ret;
2319}
2320
02b3d176
DG
2321/*
2322 * Return 0 on success else a negative value.
2323 */
2324static int notify_if_more_data(struct lttng_consumer_stream *stream,
2325 struct lttng_consumer_local_data *ctx)
2326{
2327 int ret;
2328 struct ustctl_consumer_stream *ustream;
2329
2330 assert(stream);
2331 assert(ctx);
2332
2333 ustream = stream->ustream;
2334
2335 /*
2336 * First, we are going to check if there is a new subbuffer available
2337 * before reading the stream wait_fd.
2338 */
2339 /* Get the next subbuffer */
2340 ret = ustctl_get_next_subbuf(ustream);
2341 if (ret) {
2342 /* No more data found, flag the stream. */
2343 stream->has_data = 0;
2344 ret = 0;
2345 goto end;
2346 }
2347
5420e5db 2348 ret = ustctl_put_subbuf(ustream);
02b3d176
DG
2349 assert(!ret);
2350
2351 /* This stream still has data. Flag it and wake up the data thread. */
2352 stream->has_data = 1;
2353
2354 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
2355 ssize_t writelen;
2356
2357 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
2358 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2359 ret = writelen;
2360 goto end;
2361 }
2362
2363 /* The wake up pipe has been notified. */
2364 ctx->has_wakeup = 1;
2365 }
2366 ret = 0;
2367
2368end:
2369 return ret;
2370}
2371
29d1a7ae 2372static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream)
fb83fe64 2373{
29d1a7ae 2374 int ret = 0;
fb83fe64 2375
fb83fe64 2376 /*
29d1a7ae
JG
2377 * We can consume the 1 byte written into the wait_fd by
2378 * UST. Don't trigger error if we cannot read this one byte
2379 * (read returns 0), or if the error is EAGAIN or EWOULDBLOCK.
2380 *
2381 * This is only done when the stream is monitored by a thread,
2382 * before the flush is done after a hangup and if the stream
2383 * is not flagged with data since there might be nothing to
2384 * consume in the wait fd but still have data available
2385 * flagged by the consumer wake up pipe.
fb83fe64 2386 */
29d1a7ae
JG
2387 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
2388 char dummy;
2389 ssize_t readlen;
2390
2391 readlen = lttng_read(stream->wait_fd, &dummy, 1);
2392 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2393 ret = readlen;
2394 }
fb83fe64 2395 }
fb83fe64 2396
29d1a7ae
JG
2397 return ret;
2398}
2399
2400static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
2401 struct stream_subbuffer *subbuf)
2402{
2403 int ret;
2404
2405 ret = ustctl_get_subbuf_size(
2406 stream->ustream, &subbuf->info.data.subbuf_size);
2407 if (ret) {
fb83fe64
JD
2408 goto end;
2409 }
29d1a7ae
JG
2410
2411 ret = ustctl_get_padded_subbuf_size(
2412 stream->ustream, &subbuf->info.data.padded_subbuf_size);
2413 if (ret) {
2414 goto end;
fb83fe64 2415 }
fb83fe64
JD
2416
2417end:
2418 return ret;
2419}
2420
29d1a7ae
JG
2421static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
2422 struct stream_subbuffer *subbuf)
d41f73b7 2423{
29d1a7ae 2424 int ret;
ffe60014 2425
29d1a7ae
JG
2426 ret = extract_common_subbuffer_info(stream, subbuf);
2427 if (ret) {
2428 goto end;
2429 }
d41f73b7 2430
3910d1ea 2431 subbuf->info.metadata.version = stream->metadata_version;
ffe60014 2432
29d1a7ae
JG
2433end:
2434 return ret;
2435}
d41f73b7 2436
29d1a7ae
JG
2437static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
2438 struct stream_subbuffer *subbuf)
2439{
2440 int ret;
c617c0c6 2441
29d1a7ae
JG
2442 ret = extract_common_subbuffer_info(stream, subbuf);
2443 if (ret) {
2444 goto end;
d41f73b7
MD
2445 }
2446
29d1a7ae
JG
2447 ret = ustctl_get_packet_size(
2448 stream->ustream, &subbuf->info.data.packet_size);
2449 if (ret < 0) {
2450 PERROR("Failed to get sub-buffer packet size");
2451 goto end;
2452 }
04ef1097 2453
29d1a7ae
JG
2454 ret = ustctl_get_content_size(
2455 stream->ustream, &subbuf->info.data.content_size);
2456 if (ret < 0) {
2457 PERROR("Failed to get sub-buffer content size");
d41f73b7
MD
2458 goto end;
2459 }
309167d2 2460
29d1a7ae
JG
2461 ret = ustctl_get_timestamp_begin(
2462 stream->ustream, &subbuf->info.data.timestamp_begin);
2463 if (ret < 0) {
2464 PERROR("Failed to get sub-buffer begin timestamp");
2465 goto end;
2466 }
fb83fe64 2467
29d1a7ae
JG
2468 ret = ustctl_get_timestamp_end(
2469 stream->ustream, &subbuf->info.data.timestamp_end);
2470 if (ret < 0) {
2471 PERROR("Failed to get sub-buffer end timestamp");
2472 goto end;
2473 }
2474
2475 ret = ustctl_get_events_discarded(
2476 stream->ustream, &subbuf->info.data.events_discarded);
2477 if (ret) {
2478 PERROR("Failed to get sub-buffer events discarded count");
2479 goto end;
2480 }
2481
2482 ret = ustctl_get_sequence_number(stream->ustream,
2483 &subbuf->info.data.sequence_number.value);
2484 if (ret) {
2485 /* May not be supported by older LTTng-modules. */
2486 if (ret != -ENOTTY) {
2487 PERROR("Failed to get sub-buffer sequence number");
fb83fe64
JD
2488 goto end;
2489 }
1c20f0e2 2490 } else {
29d1a7ae 2491 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
2492 }
2493
29d1a7ae
JG
2494 ret = ustctl_get_stream_id(
2495 stream->ustream, &subbuf->info.data.stream_id);
2496 if (ret < 0) {
2497 PERROR("Failed to get stream id");
2498 goto end;
2499 }
1d4dfdef 2500
29d1a7ae
JG
2501 ret = ustctl_get_instance_id(stream->ustream,
2502 &subbuf->info.data.stream_instance_id.value);
2503 if (ret) {
2504 /* May not be supported by older LTTng-modules. */
2505 if (ret != -ENOTTY) {
2506 PERROR("Failed to get stream instance id");
2507 goto end;
2508 }
2509 } else {
2510 subbuf->info.data.stream_instance_id.is_set = true;
2511 }
2512end:
2513 return ret;
2514}
1d4dfdef 2515
29d1a7ae
JG
2516static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
2517 struct stream_subbuffer *subbuffer)
2518{
2519 int ret;
2520 const char *addr;
1d4dfdef 2521
29d1a7ae
JG
2522 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
2523 stream, subbuffer);
2524 if (ret) {
2525 goto end;
2526 }
1fdb9a78 2527
29d1a7ae 2528 ret = get_current_subbuf_addr(stream, &addr);
1fdb9a78 2529 if (ret) {
29d1a7ae 2530 goto end;
1fdb9a78
JG
2531 }
2532
29d1a7ae
JG
2533 subbuffer->buffer.buffer = lttng_buffer_view_init(
2534 addr, 0, subbuffer->info.data.padded_subbuf_size);
2535 assert(subbuffer->buffer.buffer.data != NULL);
2536end:
2537 return ret;
2538}
ace0e591 2539
29d1a7ae
JG
2540static int get_next_subbuffer(struct lttng_consumer_stream *stream,
2541 struct stream_subbuffer *subbuffer)
2542{
2543 int ret;
331744e3 2544
29d1a7ae
JG
2545 ret = ustctl_get_next_subbuf(stream->ustream);
2546 if (ret) {
2547 goto end;
02b3d176
DG
2548 }
2549
29d1a7ae
JG
2550 ret = get_next_subbuffer_common(stream, subbuffer);
2551 if (ret) {
1c20f0e2
JD
2552 goto end;
2553 }
29d1a7ae
JG
2554end:
2555 return ret;
2556}
1c20f0e2 2557
29d1a7ae
JG
2558static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
2559 struct stream_subbuffer *subbuffer)
2560{
2561 int ret;
d6ef77b3
JG
2562 bool cache_empty;
2563 bool got_subbuffer;
2564 bool coherent;
2565 bool buffer_empty;
2566 unsigned long consumed_pos, produced_pos;
29d1a7ae 2567
d6ef77b3
JG
2568 do {
2569 ret = ustctl_get_next_subbuf(stream->ustream);
2570 if (ret == 0) {
2571 got_subbuffer = true;
2572 } else {
2573 got_subbuffer = false;
2574 if (ret != -EAGAIN) {
2575 /* Fatal error. */
2576 goto end;
2577 }
c585821b
MD
2578 }
2579
d6ef77b3
JG
2580 /*
2581 * Determine if the cache is empty and ensure that a sub-buffer
2582 * is made available if the cache is not empty.
2583 */
2584 if (!got_subbuffer) {
2585 ret = commit_one_metadata_packet(stream);
2586 if (ret < 0 && ret != -ENOBUFS) {
2587 goto end;
2588 } else if (ret == 0) {
2589 /* Not an error, the cache is empty. */
2590 cache_empty = true;
2591 ret = -ENODATA;
2592 goto end;
2593 } else {
2594 cache_empty = false;
2595 }
2596 } else {
2597 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
2598 cache_empty = stream->chan->metadata_cache->max_offset ==
2599 stream->ust_metadata_pushed;
2600 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
94d49140 2601 }
d6ef77b3 2602 } while (!got_subbuffer);
94d49140 2603
d6ef77b3 2604 /* Populate sub-buffer infos and view. */
29d1a7ae
JG
2605 ret = get_next_subbuffer_common(stream, subbuffer);
2606 if (ret) {
1c20f0e2 2607 goto end;
309167d2 2608 }
d6ef77b3
JG
2609
2610 ret = lttng_ustconsumer_take_snapshot(stream);
2611 if (ret < 0) {
2612 /*
2613 * -EAGAIN is not expected since we got a sub-buffer and haven't
2614 * pushed the consumption position yet (on put_next).
2615 */
2616 PERROR("Failed to take a snapshot of metadata buffer positions");
2617 goto end;
2618 }
2619
2620 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
2621 if (ret) {
2622 PERROR("Failed to get metadata consumed position");
2623 goto end;
2624 }
2625
2626 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
2627 if (ret) {
2628 PERROR("Failed to get metadata produced position");
2629 goto end;
2630 }
2631
2632 /* Last sub-buffer of the ring buffer ? */
2633 buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
2634
2635 /*
2636 * The sessiond registry lock ensures that coherent units of metadata
2637 * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
2638 * acquired, the cache is empty, and it is the only available sub-buffer
2639 * available, it is safe to assume that it is "coherent".
2640 */
2641 coherent = got_subbuffer && cache_empty && buffer_empty;
2642
2643 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
d41f73b7
MD
2644end:
2645 return ret;
2646}
2647
29d1a7ae
JG
2648static int put_next_subbuffer(struct lttng_consumer_stream *stream,
2649 struct stream_subbuffer *subbuffer)
2650{
2651 const int ret = ustctl_put_next_subbuf(stream->ustream);
2652
2653 assert(ret == 0);
2654 return ret;
2655}
2656
2657static int signal_metadata(struct lttng_consumer_stream *stream,
2658 struct lttng_consumer_local_data *ctx)
2659{
2660 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
2661}
2662
d6ef77b3 2663static int lttng_ustconsumer_set_stream_ops(
29d1a7ae
JG
2664 struct lttng_consumer_stream *stream)
2665{
d6ef77b3
JG
2666 int ret = 0;
2667
29d1a7ae
JG
2668 stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
2669 if (stream->metadata_flag) {
2670 stream->read_subbuffer_ops.get_next_subbuffer =
2671 get_next_subbuffer_metadata;
2672 stream->read_subbuffer_ops.extract_subbuffer_info =
2673 extract_metadata_subbuffer_info;
2674 stream->read_subbuffer_ops.reset_metadata =
3910d1ea 2675 metadata_stream_reset_cache_consumed_position;
d6ef77b3
JG
2676 if (stream->chan->is_live) {
2677 stream->read_subbuffer_ops.on_sleep = signal_metadata;
2678 ret = consumer_stream_enable_metadata_bucketization(
2679 stream);
2680 if (ret) {
2681 goto end;
2682 }
2683 }
29d1a7ae
JG
2684 } else {
2685 stream->read_subbuffer_ops.get_next_subbuffer =
2686 get_next_subbuffer;
2687 stream->read_subbuffer_ops.extract_subbuffer_info =
2688 extract_data_subbuffer_info;
2689 stream->read_subbuffer_ops.on_sleep = notify_if_more_data;
2690 if (stream->chan->is_live) {
2691 stream->read_subbuffer_ops.send_live_beacon =
2692 consumer_flush_ust_index;
2693 }
2694 }
2695
2696 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
d6ef77b3
JG
2697end:
2698 return ret;
29d1a7ae
JG
2699}
2700
ffe60014
DG
2701/*
2702 * Called when a stream is created.
fe4477ee
JD
2703 *
2704 * Return 0 on success or else a negative value.
ffe60014 2705 */
d41f73b7
MD
2706int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
2707{
fe4477ee
JD
2708 int ret;
2709
10a50311
JD
2710 assert(stream);
2711
fe4477ee 2712 /* Don't create anything if this is set for streaming. */
6d40f8fa 2713 if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) {
fe4477ee
JD
2714 ret = utils_create_stream_file(stream->chan->pathname, stream->name,
2715 stream->chan->tracefile_size, stream->tracefile_count_current,
309167d2 2716 stream->uid, stream->gid, NULL);
fe4477ee
JD
2717 if (ret < 0) {
2718 goto error;
2719 }
2720 stream->out_fd = ret;
2721 stream->tracefile_size_current = 0;
309167d2
JD
2722
2723 if (!stream->metadata_flag) {
e0547b83
MD
2724 struct lttng_index_file *index_file;
2725
2726 index_file = lttng_index_file_create(stream->chan->pathname,
309167d2
JD
2727 stream->name, stream->uid, stream->gid,
2728 stream->chan->tracefile_size,
e0547b83
MD
2729 stream->tracefile_count_current,
2730 CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
2731 if (!index_file) {
309167d2
JD
2732 goto error;
2733 }
e0547b83 2734 stream->index_file = index_file;
309167d2 2735 }
fe4477ee 2736 }
29d1a7ae
JG
2737
2738 lttng_ustconsumer_set_stream_ops(stream);
fe4477ee
JD
2739 ret = 0;
2740
2741error:
2742 return ret;
d41f73b7 2743}
ca22feea
DG
2744
2745/*
2746 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
2747 * stream. Consumer data lock MUST be acquired before calling this function
2748 * and the stream lock.
ca22feea 2749 *
6d805429 2750 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
2751 * data is available for trace viewer reading.
2752 */
6d805429 2753int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
2754{
2755 int ret;
2756
2757 assert(stream);
ffe60014 2758 assert(stream->ustream);
ca22feea 2759
6d805429 2760 DBG("UST consumer checking data pending");
c8f59ee5 2761
ca6b395f
MD
2762 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
2763 ret = 0;
2764 goto end;
2765 }
2766
04ef1097 2767 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
e6ee4eab
DG
2768 uint64_t contiguous, pushed;
2769
2770 /* Ease our life a bit. */
c585821b 2771 contiguous = stream->chan->metadata_cache->max_offset;
e6ee4eab
DG
2772 pushed = stream->ust_metadata_pushed;
2773
04ef1097
MD
2774 /*
2775 * We can simply check whether all contiguously available data
2776 * has been pushed to the ring buffer, since the push operation
2777 * is performed within get_next_subbuf(), and because both
2778 * get_next_subbuf() and put_next_subbuf() are issued atomically
2779 * thanks to the stream lock within
2780 * lttng_ustconsumer_read_subbuffer(). This basically means that
2781 * whetnever ust_metadata_pushed is incremented, the associated
2782 * metadata has been consumed from the metadata stream.
2783 */
2784 DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
e6ee4eab 2785 contiguous, pushed);
aa01b94c 2786 assert(((int64_t) (contiguous - pushed)) >= 0);
e6ee4eab 2787 if ((contiguous != pushed) ||
6acdf328 2788 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
04ef1097
MD
2789 ret = 1; /* Data is pending */
2790 goto end;
2791 }
2792 } else {
2793 ret = ustctl_get_next_subbuf(stream->ustream);
2794 if (ret == 0) {
2795 /*
2796 * There is still data so let's put back this
2797 * subbuffer.
2798 */
2799 ret = ustctl_put_subbuf(stream->ustream);
2800 assert(ret == 0);
2801 ret = 1; /* Data is pending */
2802 goto end;
2803 }
ca22feea
DG
2804 }
2805
6d805429
DG
2806 /* Data is NOT pending so ready to be read. */
2807 ret = 0;
ca22feea 2808
6efae65e
DG
2809end:
2810 return ret;
ca22feea 2811}
d88aee68 2812
6d574024
DG
2813/*
2814 * Stop a given metadata channel timer if enabled and close the wait fd which
2815 * is the poll pipe of the metadata stream.
2816 *
2817 * This MUST be called with the metadata channel acquired.
2818 */
2819void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
2820{
2821 int ret;
2822
2823 assert(metadata);
2824 assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
2825
2826 DBG("Closing metadata channel key %" PRIu64, metadata->key);
2827
2828 if (metadata->switch_timer_enabled == 1) {
2829 consumer_timer_switch_stop(metadata);
2830 }
2831
2832 if (!metadata->metadata_stream) {
2833 goto end;
2834 }
2835
2836 /*
2837 * Closing write side so the thread monitoring the stream wakes up if any
2838 * and clean the metadata stream.
2839 */
2840 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
2841 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
2842 if (ret < 0) {
2843 PERROR("closing metadata pipe write side");
2844 }
2845 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
2846 }
2847
2848end:
2849 return;
2850}
2851
d88aee68
DG
2852/*
2853 * Close every metadata stream wait fd of the metadata hash table. This
2854 * function MUST be used very carefully so not to run into a race between the
2855 * metadata thread handling streams and this function closing their wait fd.
2856 *
2857 * For UST, this is used when the session daemon hangs up. Its the metadata
2858 * producer so calling this is safe because we are assured that no state change
2859 * can occur in the metadata thread for the streams in the hash table.
2860 */
6d574024 2861void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
d88aee68 2862{
d88aee68
DG
2863 struct lttng_ht_iter iter;
2864 struct lttng_consumer_stream *stream;
2865
2866 assert(metadata_ht);
2867 assert(metadata_ht->ht);
2868
2869 DBG("UST consumer closing all metadata streams");
2870
2871 rcu_read_lock();
2872 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
2873 node.node) {
9ce5646a
MD
2874
2875 health_code_update();
2876
be2b50c7 2877 pthread_mutex_lock(&stream->chan->lock);
6d574024 2878 lttng_ustconsumer_close_metadata(stream->chan);
be2b50c7
DG
2879 pthread_mutex_unlock(&stream->chan->lock);
2880
d88aee68
DG
2881 }
2882 rcu_read_unlock();
2883}
d8ef542d
MD
2884
2885void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
2886{
2887 int ret;
2888
2889 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
2890 if (ret < 0) {
2891 ERR("Unable to close wakeup fd");
2892 }
2893}
331744e3 2894
f666ae70
MD
2895/*
2896 * Please refer to consumer-timer.c before adding any lock within this
2897 * function or any of its callees. Timers have a very strict locking
2898 * semantic with respect to teardown. Failure to respect this semantic
2899 * introduces deadlocks.
c585821b
MD
2900 *
2901 * DON'T hold the metadata lock when calling this function, else this
2902 * can cause deadlock involving consumer awaiting for metadata to be
2903 * pushed out due to concurrent interaction with the session daemon.
f666ae70 2904 */
331744e3 2905int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
94d49140 2906 struct lttng_consumer_channel *channel, int timer, int wait)
331744e3
JD
2907{
2908 struct lttcomm_metadata_request_msg request;
2909 struct lttcomm_consumer_msg msg;
0c759fc9 2910 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
93ec662e 2911 uint64_t len, key, offset, version;
331744e3
JD
2912 int ret;
2913
2914 assert(channel);
2915 assert(channel->metadata_cache);
2916
53efb85a
MD
2917 memset(&request, 0, sizeof(request));
2918
331744e3
JD
2919 /* send the metadata request to sessiond */
2920 switch (consumer_data.type) {
2921 case LTTNG_CONSUMER64_UST:
2922 request.bits_per_long = 64;
2923 break;
2924 case LTTNG_CONSUMER32_UST:
2925 request.bits_per_long = 32;
2926 break;
2927 default:
2928 request.bits_per_long = 0;
2929 break;
2930 }
2931
2932 request.session_id = channel->session_id;
1950109e 2933 request.session_id_per_pid = channel->session_id_per_pid;
567eb353
DG
2934 /*
2935 * Request the application UID here so the metadata of that application can
2936 * be sent back. The channel UID corresponds to the user UID of the session
2937 * used for the rights on the stream file(s).
2938 */
2939 request.uid = channel->ust_app_uid;
331744e3 2940 request.key = channel->key;
567eb353 2941
1950109e 2942 DBG("Sending metadata request to sessiond, session id %" PRIu64
cc84d37b 2943 ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
567eb353
DG
2944 request.session_id, request.session_id_per_pid, request.uid,
2945 request.key);
331744e3 2946
75d83e50 2947 pthread_mutex_lock(&ctx->metadata_socket_lock);
9ce5646a
MD
2948
2949 health_code_update();
2950
331744e3
JD
2951 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
2952 sizeof(request));
2953 if (ret < 0) {
2954 ERR("Asking metadata to sessiond");
2955 goto end;
2956 }
2957
9ce5646a
MD
2958 health_code_update();
2959
331744e3
JD
2960 /* Receive the metadata from sessiond */
2961 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
2962 sizeof(msg));
2963 if (ret != sizeof(msg)) {
8fd623e0 2964 DBG("Consumer received unexpected message size %d (expects %zu)",
331744e3
JD
2965 ret, sizeof(msg));
2966 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
2967 /*
2968 * The ret value might 0 meaning an orderly shutdown but this is ok
2969 * since the caller handles this.
2970 */
2971 goto end;
2972 }
2973
9ce5646a
MD
2974 health_code_update();
2975
331744e3
JD
2976 if (msg.cmd_type == LTTNG_ERR_UND) {
2977 /* No registry found */
2978 (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
2979 ret_code);
2980 ret = 0;
2981 goto end;
2982 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
2983 ERR("Unexpected cmd_type received %d", msg.cmd_type);
2984 ret = -1;
2985 goto end;
2986 }
2987
2988 len = msg.u.push_metadata.len;
2989 key = msg.u.push_metadata.key;
2990 offset = msg.u.push_metadata.target_offset;
93ec662e 2991 version = msg.u.push_metadata.version;
331744e3
JD
2992
2993 assert(key == channel->key);
2994 if (len == 0) {
2995 DBG("No new metadata to receive for key %" PRIu64, key);
2996 }
2997
9ce5646a
MD
2998 health_code_update();
2999
331744e3
JD
3000 /* Tell session daemon we are ready to receive the metadata. */
3001 ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
0c759fc9 3002 LTTCOMM_CONSUMERD_SUCCESS);
331744e3
JD
3003 if (ret < 0 || len == 0) {
3004 /*
3005 * Somehow, the session daemon is not responding anymore or there is
3006 * nothing to receive.
3007 */
3008 goto end;
3009 }
3010
9ce5646a
MD
3011 health_code_update();
3012
1eb682be 3013 ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
93ec662e 3014 key, offset, len, version, channel, timer, wait);
1eb682be 3015 if (ret >= 0) {
f2a444f1
DG
3016 /*
3017 * Only send the status msg if the sessiond is alive meaning a positive
3018 * ret code.
3019 */
1eb682be 3020 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
f2a444f1 3021 }
331744e3
JD
3022 ret = 0;
3023
3024end:
9ce5646a
MD
3025 health_code_update();
3026
75d83e50 3027 pthread_mutex_unlock(&ctx->metadata_socket_lock);
331744e3
JD
3028 return ret;
3029}
70190e1c
DG
3030
3031/*
3032 * Return the ustctl call for the get stream id.
3033 */
3034int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
3035 uint64_t *stream_id)
3036{
3037 assert(stream);
3038 assert(stream_id);
3039
3040 return ustctl_get_stream_id(stream->ustream, stream_id);
3041}
This page took 0.266686 seconds and 5 git commands to generate.