Propagate trace format all the way to the consumer
[deliverable/lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <poll.h>
12 #include <pthread.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <sys/mman.h>
16 #include <sys/socket.h>
17 #include <sys/types.h>
18 #include <inttypes.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21 #include <stdint.h>
22
23 #include <bin/lttng-consumerd/health-consumerd.hpp>
24 #include <common/common.hpp>
25 #include <common/kernel-ctl/kernel-ctl.hpp>
26 #include <common/sessiond-comm/sessiond-comm.hpp>
27 #include <common/sessiond-comm/relayd.hpp>
28 #include <common/compat/fcntl.hpp>
29 #include <common/compat/endian.hpp>
30 #include <common/pipe.hpp>
31 #include <common/relayd/relayd.hpp>
32 #include <common/utils.hpp>
33 #include <common/consumer/consumer-stream.hpp>
34 #include <common/index/index.hpp>
35 #include <common/consumer/consumer-timer.hpp>
36 #include <common/optional.hpp>
37 #include <common/buffer-view.hpp>
38 #include <common/consumer/consumer.hpp>
39 #include <common/consumer/metadata-bucket.hpp>
40
41 #include "kernel-consumer.hpp"
42
43 extern struct lttng_consumer_global_data the_consumer_data;
44 extern int consumer_poll_timeout;
45
46 /*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
51 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
52 {
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
62 PERROR("Getting sub-buffer snapshot.");
63 }
64
65 return ret;
66 }
67
68 /*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73 int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75 {
76 LTTNG_ASSERT(stream);
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79 }
80
81 /*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
86 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
87 unsigned long *pos)
88 {
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
94 PERROR("kernctl_snapshot_get_produced");
95 }
96
97 return ret;
98 }
99
100 /*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107 {
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
113 PERROR("kernctl_snapshot_get_consumed");
114 }
115
116 return ret;
117 }
118
119 static
120 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122 {
123 int ret;
124 unsigned long mmap_offset;
125 const char *mmap_base = (const char *) stream->mmap_base;
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134 error:
135 return ret;
136 }
137
138 /*
139 * Take a snapshot of all the stream of a channel
140 * RCU read-side lock must be held across this function to ensure existence of
141 * channel.
142 *
143 * Returns 0 on success, < 0 on error
144 */
145 static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
148 uint64_t nb_packets_per_stream)
149 {
150 int ret;
151 struct lttng_consumer_stream *stream;
152
153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
154
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
158 rcu_read_lock();
159
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
164 ret = -1;
165 goto end;
166 }
167
168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
169 unsigned long consumed_pos, produced_pos;
170
171 health_code_update();
172
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
178 LTTNG_ASSERT(channel->trace_chunk);
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
188 LTTNG_ASSERT(!stream->trace_chunk);
189 stream->trace_chunk = channel->trace_chunk;
190
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
198 ret = consumer_send_relayd_stream(stream, path);
199 if (ret < 0) {
200 ERR("sending stream to relayd");
201 goto end_unlock;
202 }
203 } else {
204 ret = consumer_stream_create_output_files(stream,
205 false);
206 if (ret < 0) {
207 goto end_unlock;
208 }
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
211 }
212
213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
214 if (ret < 0) {
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
225 goto end_unlock;
226 }
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
245 goto end_unlock;
246 }
247
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
251
252 while ((long) (consumed_pos - produced_pos) < 0) {
253 ssize_t read_len;
254 unsigned long len, padded_len;
255 const char *subbuf_addr;
256 struct lttng_buffer_view subbuf_view;
257
258 health_code_update();
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
263 if (ret != -EAGAIN) {
264 PERROR("kernctl_get_subbuf snapshot");
265 goto end_unlock;
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
269 stream->chan->lost_packets++;
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
276 goto error_put_subbuf;
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
282 goto error_put_subbuf;
283 }
284
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
292 read_len = lttng_consumer_on_read_subbuffer_mmap(
293 stream, &subbuf_view,
294 padded_len - len);
295 /*
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
315 goto end_unlock;
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
320 if (relayd_id == (uint64_t) -1ULL) {
321 if (stream->out_fd >= 0) {
322 ret = close(stream->out_fd);
323 if (ret < 0) {
324 PERROR("Kernel consumer snapshot close out_fd");
325 goto end_unlock;
326 }
327 stream->out_fd = -1;
328 }
329 } else {
330 close_relayd_stream(stream);
331 stream->net_seq_idx = (uint64_t) -1ULL;
332 }
333 lttng_trace_chunk_put(stream->trace_chunk);
334 stream->trace_chunk = NULL;
335 pthread_mutex_unlock(&stream->lock);
336 }
337
338 /* All good! */
339 ret = 0;
340 goto end;
341
342 error_put_subbuf:
343 ret = kernctl_put_subbuf(stream->wait_fd);
344 if (ret < 0) {
345 ERR("Snapshot kernctl_put_subbuf error path");
346 }
347 end_unlock:
348 pthread_mutex_unlock(&stream->lock);
349 end:
350 rcu_read_unlock();
351 pthread_mutex_unlock(&channel->lock);
352 return ret;
353 }
354
355 /*
356 * Read the whole metadata available for a snapshot.
357 * RCU read-side lock must be held across this function to ensure existence of
358 * metadata_channel.
359 *
360 * Returns 0 on success, < 0 on error
361 */
362 static int lttng_kconsumer_snapshot_metadata(
363 struct lttng_consumer_channel *metadata_channel,
364 uint64_t key, char *path, uint64_t relayd_id,
365 struct lttng_consumer_local_data *ctx)
366 {
367 int ret, use_relayd = 0;
368 ssize_t ret_read;
369 struct lttng_consumer_stream *metadata_stream;
370
371 LTTNG_ASSERT(ctx);
372
373 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
374 key, path);
375
376 rcu_read_lock();
377
378 metadata_stream = metadata_channel->metadata_stream;
379 LTTNG_ASSERT(metadata_stream);
380
381 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
382 LTTNG_ASSERT(metadata_channel->trace_chunk);
383 LTTNG_ASSERT(metadata_stream->trace_chunk);
384
385 /* Flag once that we have a valid relayd for the stream. */
386 if (relayd_id != (uint64_t) -1ULL) {
387 use_relayd = 1;
388 }
389
390 if (use_relayd) {
391 ret = consumer_send_relayd_stream(metadata_stream, path);
392 if (ret < 0) {
393 goto error_snapshot;
394 }
395 } else {
396 ret = consumer_stream_create_output_files(metadata_stream,
397 false);
398 if (ret < 0) {
399 goto error_snapshot;
400 }
401 }
402
403 do {
404 health_code_update();
405
406 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
407 if (ret_read < 0) {
408 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
409 ret_read);
410 ret = ret_read;
411 goto error_snapshot;
412 }
413 } while (ret_read > 0);
414
415 if (use_relayd) {
416 close_relayd_stream(metadata_stream);
417 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
418 } else {
419 if (metadata_stream->out_fd >= 0) {
420 ret = close(metadata_stream->out_fd);
421 if (ret < 0) {
422 PERROR("Kernel consumer snapshot metadata close out_fd");
423 /*
424 * Don't go on error here since the snapshot was successful at this
425 * point but somehow the close failed.
426 */
427 }
428 metadata_stream->out_fd = -1;
429 lttng_trace_chunk_put(metadata_stream->trace_chunk);
430 metadata_stream->trace_chunk = NULL;
431 }
432 }
433
434 ret = 0;
435 error_snapshot:
436 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
437 consumer_stream_destroy(metadata_stream, NULL);
438 metadata_channel->metadata_stream = NULL;
439 rcu_read_unlock();
440 return ret;
441 }
442
443 /*
444 * Receive command from session daemon and process it.
445 *
446 * Return 1 on success else a negative value or 0.
447 */
448 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
449 int sock, struct pollfd *consumer_sockpoll)
450 {
451 int ret_func;
452 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
453 struct lttcomm_consumer_msg msg;
454
455 health_code_update();
456
457 {
458 ssize_t ret_recv;
459
460 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
461 if (ret_recv != sizeof(msg)) {
462 if (ret_recv > 0) {
463 lttng_consumer_send_error(ctx,
464 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
465 ret_recv = -1;
466 }
467 return ret_recv;
468 }
469 }
470
471 health_code_update();
472
473 /* Deprecated command */
474 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
475
476 health_code_update();
477
478 /* relayd needs RCU read-side protection */
479 rcu_read_lock();
480
481 switch (msg.cmd_type) {
482 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
483 {
484 uint32_t major = msg.u.relayd_sock.major;
485 uint32_t minor = msg.u.relayd_sock.minor;
486 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
487 msg.u.relayd_sock.relayd_socket_protocol;
488
489 /* Session daemon status message are handled in the following call. */
490 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
491 msg.u.relayd_sock.type, ctx, sock,
492 consumer_sockpoll, msg.u.relayd_sock.session_id,
493 msg.u.relayd_sock.relayd_session_id, major,
494 minor, protocol);
495 goto end_nosignal;
496 }
497 case LTTNG_CONSUMER_ADD_CHANNEL:
498 {
499 struct lttng_consumer_channel *new_channel;
500 int ret_send_status, ret_add_channel = 0;
501 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
502
503 health_code_update();
504
505 /* First send a status message before receiving the fds. */
506 ret_send_status = consumer_send_status_msg(sock, ret_code);
507 if (ret_send_status < 0) {
508 /* Somehow, the session daemon is not responding anymore. */
509 goto error_fatal;
510 }
511
512 health_code_update();
513
514 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
515 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
516 msg.u.channel.session_id,
517 msg.u.channel.chunk_id.is_set ? &chunk_id : NULL,
518 msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.relayd_id,
519 msg.u.channel.output, msg.u.channel.tracefile_size,
520 msg.u.channel.tracefile_count, 0, msg.u.channel.monitor,
521 msg.u.channel.live_timer_interval, msg.u.channel.is_live, NULL,
522 NULL, msg.u.channel.trace_format);
523 if (new_channel == NULL) {
524 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
525 goto end_nosignal;
526 }
527 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
528 switch (msg.u.channel.output) {
529 case LTTNG_EVENT_SPLICE:
530 new_channel->output = CONSUMER_CHANNEL_SPLICE;
531 break;
532 case LTTNG_EVENT_MMAP:
533 new_channel->output = CONSUMER_CHANNEL_MMAP;
534 break;
535 default:
536 ERR("Channel output unknown %d", msg.u.channel.output);
537 goto end_nosignal;
538 }
539
540 /* Translate and save channel type. */
541 switch (msg.u.channel.type) {
542 case CONSUMER_CHANNEL_TYPE_DATA:
543 case CONSUMER_CHANNEL_TYPE_METADATA:
544 new_channel->type = (consumer_channel_type) msg.u.channel.type;
545 break;
546 default:
547 abort();
548 goto end_nosignal;
549 };
550
551 health_code_update();
552
553 if (ctx->on_recv_channel != NULL) {
554 int ret_recv_channel =
555 ctx->on_recv_channel(new_channel);
556 if (ret_recv_channel == 0) {
557 ret_add_channel = consumer_add_channel(
558 new_channel, ctx);
559 } else if (ret_recv_channel < 0) {
560 goto end_nosignal;
561 }
562 } else {
563 ret_add_channel =
564 consumer_add_channel(new_channel, ctx);
565 }
566 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
567 !ret_add_channel) {
568 int monitor_start_ret;
569
570 DBG("Consumer starting monitor timer");
571 consumer_timer_live_start(new_channel,
572 msg.u.channel.live_timer_interval);
573 monitor_start_ret = consumer_timer_monitor_start(
574 new_channel,
575 msg.u.channel.monitor_timer_interval);
576 if (monitor_start_ret < 0) {
577 ERR("Starting channel monitoring timer failed");
578 goto end_nosignal;
579 }
580 }
581
582 health_code_update();
583
584 /* If we received an error in add_channel, we need to report it. */
585 if (ret_add_channel < 0) {
586 ret_send_status = consumer_send_status_msg(
587 sock, ret_add_channel);
588 if (ret_send_status < 0) {
589 goto error_fatal;
590 }
591 goto end_nosignal;
592 }
593
594 goto end_nosignal;
595 }
596 case LTTNG_CONSUMER_ADD_STREAM:
597 {
598 int fd;
599 struct lttng_pipe *stream_pipe;
600 struct lttng_consumer_stream *new_stream;
601 struct lttng_consumer_channel *channel;
602 int alloc_ret = 0;
603 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
604 ssize_t ret_pipe_write, ret_recv;
605
606 /*
607 * Get stream's channel reference. Needed when adding the stream to the
608 * global hash table.
609 */
610 channel = consumer_find_channel(msg.u.stream.channel_key);
611 if (!channel) {
612 /*
613 * We could not find the channel. Can happen if cpu hotplug
614 * happens while tearing down.
615 */
616 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
617 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
618 }
619
620 health_code_update();
621
622 /* First send a status message before receiving the fds. */
623 ret_send_status = consumer_send_status_msg(sock, ret_code);
624 if (ret_send_status < 0) {
625 /* Somehow, the session daemon is not responding anymore. */
626 goto error_add_stream_fatal;
627 }
628
629 health_code_update();
630
631 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
632 /* Channel was not found. */
633 goto error_add_stream_nosignal;
634 }
635
636 /* Blocking call */
637 health_poll_entry();
638 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
639 health_poll_exit();
640 if (ret_poll) {
641 goto error_add_stream_fatal;
642 }
643
644 health_code_update();
645
646 /* Get stream file descriptor from socket */
647 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
648 if (ret_recv != sizeof(fd)) {
649 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
650 ret_func = ret_recv;
651 goto end;
652 }
653
654 health_code_update();
655
656 /*
657 * Send status code to session daemon only if the recv works. If the
658 * above recv() failed, the session daemon is notified through the
659 * error socket and the teardown is eventually done.
660 */
661 ret_send_status = consumer_send_status_msg(sock, ret_code);
662 if (ret_send_status < 0) {
663 /* Somehow, the session daemon is not responding anymore. */
664 goto error_add_stream_nosignal;
665 }
666
667 health_code_update();
668
669 pthread_mutex_lock(&channel->lock);
670 new_stream = consumer_stream_create(channel, channel->key, fd, channel->name,
671 channel->relayd_id, channel->session_id, channel->trace_chunk,
672 msg.u.stream.cpu, &alloc_ret, channel->type, channel->monitor,
673 channel->trace_format);
674 if (new_stream == NULL) {
675 switch (alloc_ret) {
676 case -ENOMEM:
677 case -EINVAL:
678 default:
679 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
680 break;
681 }
682 pthread_mutex_unlock(&channel->lock);
683 goto error_add_stream_nosignal;
684 }
685
686 new_stream->wait_fd = fd;
687 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
688 new_stream->wait_fd, &new_stream->max_sb_size);
689 if (ret_get_max_subbuf_size < 0) {
690 pthread_mutex_unlock(&channel->lock);
691 ERR("Failed to get kernel maximal subbuffer size");
692 goto error_add_stream_nosignal;
693 }
694
695 consumer_stream_update_channel_attributes(new_stream,
696 channel);
697
698 /*
699 * We've just assigned the channel to the stream so increment the
700 * refcount right now. We don't need to increment the refcount for
701 * streams in no monitor because we handle manually the cleanup of
702 * those. It is very important to make sure there is NO prior
703 * consumer_del_stream() calls or else the refcount will be unbalanced.
704 */
705 if (channel->monitor) {
706 uatomic_inc(&new_stream->chan->refcount);
707 }
708
709 /*
710 * The buffer flush is done on the session daemon side for the kernel
711 * so no need for the stream "hangup_flush_done" variable to be
712 * tracked. This is important for a kernel stream since we don't rely
713 * on the flush state of the stream to read data. It's not the case for
714 * user space tracing.
715 */
716 new_stream->hangup_flush_done = 0;
717
718 health_code_update();
719
720 pthread_mutex_lock(&new_stream->lock);
721 if (ctx->on_recv_stream) {
722 int ret_recv_stream = ctx->on_recv_stream(new_stream);
723 if (ret_recv_stream < 0) {
724 pthread_mutex_unlock(&new_stream->lock);
725 pthread_mutex_unlock(&channel->lock);
726 consumer_stream_free(new_stream);
727 goto error_add_stream_nosignal;
728 }
729 }
730 health_code_update();
731
732 if (new_stream->metadata_flag) {
733 channel->metadata_stream = new_stream;
734 }
735
736 /* Do not monitor this stream. */
737 if (!channel->monitor) {
738 DBG("Kernel consumer add stream %s in no monitor mode with "
739 "relayd id %" PRIu64, new_stream->name,
740 new_stream->net_seq_idx);
741 cds_list_add(&new_stream->send_node, &channel->streams.head);
742 pthread_mutex_unlock(&new_stream->lock);
743 pthread_mutex_unlock(&channel->lock);
744 goto end_add_stream;
745 }
746
747 /* Send stream to relayd if the stream has an ID. */
748 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
749 int ret_send_relayd_stream;
750
751 ret_send_relayd_stream = consumer_send_relayd_stream(
752 new_stream, new_stream->chan->pathname);
753 if (ret_send_relayd_stream < 0) {
754 pthread_mutex_unlock(&new_stream->lock);
755 pthread_mutex_unlock(&channel->lock);
756 consumer_stream_free(new_stream);
757 goto error_add_stream_nosignal;
758 }
759
760 /*
761 * If adding an extra stream to an already
762 * existing channel (e.g. cpu hotplug), we need
763 * to send the "streams_sent" command to relayd.
764 */
765 if (channel->streams_sent_to_relayd) {
766 int ret_send_relayd_streams_sent;
767
768 ret_send_relayd_streams_sent =
769 consumer_send_relayd_streams_sent(
770 new_stream->net_seq_idx);
771 if (ret_send_relayd_streams_sent < 0) {
772 pthread_mutex_unlock(&new_stream->lock);
773 pthread_mutex_unlock(&channel->lock);
774 goto error_add_stream_nosignal;
775 }
776 }
777 }
778 pthread_mutex_unlock(&new_stream->lock);
779 pthread_mutex_unlock(&channel->lock);
780
781 /* Get the right pipe where the stream will be sent. */
782 if (new_stream->metadata_flag) {
783 consumer_add_metadata_stream(new_stream);
784 stream_pipe = ctx->consumer_metadata_pipe;
785 } else {
786 consumer_add_data_stream(new_stream);
787 stream_pipe = ctx->consumer_data_pipe;
788 }
789
790 /* Visible to other threads */
791 new_stream->globally_visible = 1;
792
793 health_code_update();
794
795 ret_pipe_write = lttng_pipe_write(
796 stream_pipe, &new_stream, sizeof(new_stream));
797 if (ret_pipe_write < 0) {
798 ERR("Consumer write %s stream to pipe %d",
799 new_stream->metadata_flag ? "metadata" : "data",
800 lttng_pipe_get_writefd(stream_pipe));
801 if (new_stream->metadata_flag) {
802 consumer_del_stream_for_metadata(new_stream);
803 } else {
804 consumer_del_stream_for_data(new_stream);
805 }
806 goto error_add_stream_nosignal;
807 }
808
809 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
810 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
811 end_add_stream:
812 break;
813 error_add_stream_nosignal:
814 goto end_nosignal;
815 error_add_stream_fatal:
816 goto error_fatal;
817 }
818 case LTTNG_CONSUMER_STREAMS_SENT:
819 {
820 struct lttng_consumer_channel *channel;
821 int ret_send_status;
822
823 /*
824 * Get stream's channel reference. Needed when adding the stream to the
825 * global hash table.
826 */
827 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
828 if (!channel) {
829 /*
830 * We could not find the channel. Can happen if cpu hotplug
831 * happens while tearing down.
832 */
833 ERR("Unable to find channel key %" PRIu64,
834 msg.u.sent_streams.channel_key);
835 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
836 }
837
838 health_code_update();
839
840 /*
841 * Send status code to session daemon.
842 */
843 ret_send_status = consumer_send_status_msg(sock, ret_code);
844 if (ret_send_status < 0 ||
845 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
846 /* Somehow, the session daemon is not responding anymore. */
847 goto error_streams_sent_nosignal;
848 }
849
850 health_code_update();
851
852 /*
853 * We should not send this message if we don't monitor the
854 * streams in this channel.
855 */
856 if (!channel->monitor) {
857 goto end_error_streams_sent;
858 }
859
860 health_code_update();
861 /* Send stream to relayd if the stream has an ID. */
862 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
863 int ret_send_relay_streams;
864
865 ret_send_relay_streams = consumer_send_relayd_streams_sent(
866 msg.u.sent_streams.net_seq_idx);
867 if (ret_send_relay_streams < 0) {
868 goto error_streams_sent_nosignal;
869 }
870 channel->streams_sent_to_relayd = true;
871 }
872 end_error_streams_sent:
873 break;
874 error_streams_sent_nosignal:
875 goto end_nosignal;
876 }
877 case LTTNG_CONSUMER_UPDATE_STREAM:
878 {
879 rcu_read_unlock();
880 return -ENOSYS;
881 }
882 case LTTNG_CONSUMER_DESTROY_RELAYD:
883 {
884 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
885 struct consumer_relayd_sock_pair *relayd;
886 int ret_send_status;
887
888 DBG("Kernel consumer destroying relayd %" PRIu64, index);
889
890 /* Get relayd reference if exists. */
891 relayd = consumer_find_relayd(index);
892 if (relayd == NULL) {
893 DBG("Unable to find relayd %" PRIu64, index);
894 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
895 }
896
897 /*
898 * Each relayd socket pair has a refcount of stream attached to it
899 * which tells if the relayd is still active or not depending on the
900 * refcount value.
901 *
902 * This will set the destroy flag of the relayd object and destroy it
903 * if the refcount reaches zero when called.
904 *
905 * The destroy can happen either here or when a stream fd hangs up.
906 */
907 if (relayd) {
908 consumer_flag_relayd_for_destroy(relayd);
909 }
910
911 health_code_update();
912
913 ret_send_status = consumer_send_status_msg(sock, ret_code);
914 if (ret_send_status < 0) {
915 /* Somehow, the session daemon is not responding anymore. */
916 goto error_fatal;
917 }
918
919 goto end_nosignal;
920 }
921 case LTTNG_CONSUMER_DATA_PENDING:
922 {
923 int32_t ret_data_pending;
924 uint64_t id = msg.u.data_pending.session_id;
925 ssize_t ret_send;
926
927 DBG("Kernel consumer data pending command for id %" PRIu64, id);
928
929 ret_data_pending = consumer_data_pending(id);
930
931 health_code_update();
932
933 /* Send back returned value to session daemon */
934 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
935 sizeof(ret_data_pending));
936 if (ret_send < 0) {
937 PERROR("send data pending ret code");
938 goto error_fatal;
939 }
940
941 /*
942 * No need to send back a status message since the data pending
943 * returned value is the response.
944 */
945 break;
946 }
947 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
948 {
949 struct lttng_consumer_channel *channel;
950 uint64_t key = msg.u.snapshot_channel.key;
951 int ret_send_status;
952
953 channel = consumer_find_channel(key);
954 if (!channel) {
955 ERR("Channel %" PRIu64 " not found", key);
956 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
957 } else {
958 if (msg.u.snapshot_channel.metadata == 1) {
959 int ret_snapshot;
960
961 ret_snapshot = lttng_kconsumer_snapshot_metadata(
962 channel, key,
963 msg.u.snapshot_channel.pathname,
964 msg.u.snapshot_channel.relayd_id,
965 ctx);
966 if (ret_snapshot < 0) {
967 ERR("Snapshot metadata failed");
968 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
969 }
970 } else {
971 int ret_snapshot;
972
973 ret_snapshot = lttng_kconsumer_snapshot_channel(
974 channel, key,
975 msg.u.snapshot_channel.pathname,
976 msg.u.snapshot_channel.relayd_id,
977 msg.u.snapshot_channel
978 .nb_packets_per_stream);
979 if (ret_snapshot < 0) {
980 ERR("Snapshot channel failed");
981 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
982 }
983 }
984 }
985 health_code_update();
986
987 ret_send_status = consumer_send_status_msg(sock, ret_code);
988 if (ret_send_status < 0) {
989 /* Somehow, the session daemon is not responding anymore. */
990 goto end_nosignal;
991 }
992 break;
993 }
994 case LTTNG_CONSUMER_DESTROY_CHANNEL:
995 {
996 uint64_t key = msg.u.destroy_channel.key;
997 struct lttng_consumer_channel *channel;
998 int ret_send_status;
999
1000 channel = consumer_find_channel(key);
1001 if (!channel) {
1002 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
1003 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1004 }
1005
1006 health_code_update();
1007
1008 ret_send_status = consumer_send_status_msg(sock, ret_code);
1009 if (ret_send_status < 0) {
1010 /* Somehow, the session daemon is not responding anymore. */
1011 goto end_destroy_channel;
1012 }
1013
1014 health_code_update();
1015
1016 /* Stop right now if no channel was found. */
1017 if (!channel) {
1018 goto end_destroy_channel;
1019 }
1020
1021 /*
1022 * This command should ONLY be issued for channel with streams set in
1023 * no monitor mode.
1024 */
1025 LTTNG_ASSERT(!channel->monitor);
1026
1027 /*
1028 * The refcount should ALWAYS be 0 in the case of a channel in no
1029 * monitor mode.
1030 */
1031 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
1032
1033 consumer_del_channel(channel);
1034 end_destroy_channel:
1035 goto end_nosignal;
1036 }
1037 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1038 {
1039 ssize_t ret;
1040 uint64_t count;
1041 struct lttng_consumer_channel *channel;
1042 uint64_t id = msg.u.discarded_events.session_id;
1043 uint64_t key = msg.u.discarded_events.channel_key;
1044
1045 DBG("Kernel consumer discarded events command for session id %"
1046 PRIu64 ", channel key %" PRIu64, id, key);
1047
1048 channel = consumer_find_channel(key);
1049 if (!channel) {
1050 ERR("Kernel consumer discarded events channel %"
1051 PRIu64 " not found", key);
1052 count = 0;
1053 } else {
1054 count = channel->discarded_events;
1055 }
1056
1057 health_code_update();
1058
1059 /* Send back returned value to session daemon */
1060 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1061 if (ret < 0) {
1062 PERROR("send discarded events");
1063 goto error_fatal;
1064 }
1065
1066 break;
1067 }
1068 case LTTNG_CONSUMER_LOST_PACKETS:
1069 {
1070 ssize_t ret;
1071 uint64_t count;
1072 struct lttng_consumer_channel *channel;
1073 uint64_t id = msg.u.lost_packets.session_id;
1074 uint64_t key = msg.u.lost_packets.channel_key;
1075
1076 DBG("Kernel consumer lost packets command for session id %"
1077 PRIu64 ", channel key %" PRIu64, id, key);
1078
1079 channel = consumer_find_channel(key);
1080 if (!channel) {
1081 ERR("Kernel consumer lost packets channel %"
1082 PRIu64 " not found", key);
1083 count = 0;
1084 } else {
1085 count = channel->lost_packets;
1086 }
1087
1088 health_code_update();
1089
1090 /* Send back returned value to session daemon */
1091 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1092 if (ret < 0) {
1093 PERROR("send lost packets");
1094 goto error_fatal;
1095 }
1096
1097 break;
1098 }
1099 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1100 {
1101 int channel_monitor_pipe;
1102 int ret_send_status, ret_set_channel_monitor_pipe;
1103 ssize_t ret_recv;
1104
1105 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1106 /* Successfully received the command's type. */
1107 ret_send_status = consumer_send_status_msg(sock, ret_code);
1108 if (ret_send_status < 0) {
1109 goto error_fatal;
1110 }
1111
1112 ret_recv = lttcomm_recv_fds_unix_sock(
1113 sock, &channel_monitor_pipe, 1);
1114 if (ret_recv != sizeof(channel_monitor_pipe)) {
1115 ERR("Failed to receive channel monitor pipe");
1116 goto error_fatal;
1117 }
1118
1119 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1120 ret_set_channel_monitor_pipe =
1121 consumer_timer_thread_set_channel_monitor_pipe(
1122 channel_monitor_pipe);
1123 if (!ret_set_channel_monitor_pipe) {
1124 int flags;
1125 int ret_fcntl;
1126
1127 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1128 /* Set the pipe as non-blocking. */
1129 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1130 if (ret_fcntl == -1) {
1131 PERROR("fcntl get flags of the channel monitoring pipe");
1132 goto error_fatal;
1133 }
1134 flags = ret_fcntl;
1135
1136 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
1137 flags | O_NONBLOCK);
1138 if (ret_fcntl == -1) {
1139 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1140 goto error_fatal;
1141 }
1142 DBG("Channel monitor pipe set as non-blocking");
1143 } else {
1144 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1145 }
1146 ret_send_status = consumer_send_status_msg(sock, ret_code);
1147 if (ret_send_status < 0) {
1148 goto error_fatal;
1149 }
1150 break;
1151 }
1152 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1153 {
1154 struct lttng_consumer_channel *channel;
1155 uint64_t key = msg.u.rotate_channel.key;
1156 int ret_send_status;
1157
1158 DBG("Consumer rotate channel %" PRIu64, key);
1159
1160 channel = consumer_find_channel(key);
1161 if (!channel) {
1162 ERR("Channel %" PRIu64 " not found", key);
1163 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1164 } else {
1165 /*
1166 * Sample the rotate position of all the streams in this channel.
1167 */
1168 int ret_rotate_channel;
1169
1170 ret_rotate_channel = lttng_consumer_rotate_channel(
1171 channel, key,
1172 msg.u.rotate_channel.relayd_id);
1173 if (ret_rotate_channel < 0) {
1174 ERR("Rotate channel failed");
1175 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1176 }
1177
1178 health_code_update();
1179 }
1180
1181 ret_send_status = consumer_send_status_msg(sock, ret_code);
1182 if (ret_send_status < 0) {
1183 /* Somehow, the session daemon is not responding anymore. */
1184 goto error_rotate_channel;
1185 }
1186 if (channel) {
1187 /* Rotate the streams that are ready right now. */
1188 int ret_rotate;
1189
1190 ret_rotate = lttng_consumer_rotate_ready_streams(
1191 channel, key);
1192 if (ret_rotate < 0) {
1193 ERR("Rotate ready streams failed");
1194 }
1195 }
1196 break;
1197 error_rotate_channel:
1198 goto end_nosignal;
1199 }
1200 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1201 {
1202 struct lttng_consumer_channel *channel;
1203 uint64_t key = msg.u.clear_channel.key;
1204 int ret_send_status;
1205
1206 channel = consumer_find_channel(key);
1207 if (!channel) {
1208 DBG("Channel %" PRIu64 " not found", key);
1209 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1210 } else {
1211 int ret_clear_channel;
1212
1213 ret_clear_channel =
1214 lttng_consumer_clear_channel(channel);
1215 if (ret_clear_channel) {
1216 ERR("Clear channel failed");
1217 ret_code = (lttcomm_return_code) ret_clear_channel;
1218 }
1219
1220 health_code_update();
1221 }
1222
1223 ret_send_status = consumer_send_status_msg(sock, ret_code);
1224 if (ret_send_status < 0) {
1225 /* Somehow, the session daemon is not responding anymore. */
1226 goto end_nosignal;
1227 }
1228
1229 break;
1230 }
1231 case LTTNG_CONSUMER_INIT:
1232 {
1233 int ret_send_status;
1234 lttng_uuid sessiond_uuid;
1235
1236 std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
1237 sessiond_uuid.begin());
1238
1239 ret_code = lttng_consumer_init_command(ctx,
1240 sessiond_uuid);
1241 health_code_update();
1242 ret_send_status = consumer_send_status_msg(sock, ret_code);
1243 if (ret_send_status < 0) {
1244 /* Somehow, the session daemon is not responding anymore. */
1245 goto end_nosignal;
1246 }
1247 break;
1248 }
1249 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1250 {
1251 const struct lttng_credentials credentials = {
1252 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1253 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
1254 };
1255 const bool is_local_trace =
1256 !msg.u.create_trace_chunk.relayd_id.is_set;
1257 const uint64_t relayd_id =
1258 msg.u.create_trace_chunk.relayd_id.value;
1259 const char *chunk_override_name =
1260 *msg.u.create_trace_chunk.override_name ?
1261 msg.u.create_trace_chunk.override_name :
1262 NULL;
1263 struct lttng_directory_handle *chunk_directory_handle = NULL;
1264
1265 /*
1266 * The session daemon will only provide a chunk directory file
1267 * descriptor for local traces.
1268 */
1269 if (is_local_trace) {
1270 int chunk_dirfd;
1271 int ret_send_status;
1272 ssize_t ret_recv;
1273
1274 /* Acnowledge the reception of the command. */
1275 ret_send_status = consumer_send_status_msg(
1276 sock, LTTCOMM_CONSUMERD_SUCCESS);
1277 if (ret_send_status < 0) {
1278 /* Somehow, the session daemon is not responding anymore. */
1279 goto end_nosignal;
1280 }
1281
1282 ret_recv = lttcomm_recv_fds_unix_sock(
1283 sock, &chunk_dirfd, 1);
1284 if (ret_recv != sizeof(chunk_dirfd)) {
1285 ERR("Failed to receive trace chunk directory file descriptor");
1286 goto error_fatal;
1287 }
1288
1289 DBG("Received trace chunk directory fd (%d)",
1290 chunk_dirfd);
1291 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
1292 chunk_dirfd);
1293 if (!chunk_directory_handle) {
1294 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1295 if (close(chunk_dirfd)) {
1296 PERROR("Failed to close chunk directory file descriptor");
1297 }
1298 goto error_fatal;
1299 }
1300 }
1301
1302 ret_code = lttng_consumer_create_trace_chunk(
1303 !is_local_trace ? &relayd_id : NULL,
1304 msg.u.create_trace_chunk.session_id,
1305 msg.u.create_trace_chunk.chunk_id,
1306 (time_t) msg.u.create_trace_chunk
1307 .creation_timestamp,
1308 chunk_override_name,
1309 msg.u.create_trace_chunk.credentials.is_set ?
1310 &credentials :
1311 NULL,
1312 chunk_directory_handle);
1313 lttng_directory_handle_put(chunk_directory_handle);
1314 goto end_msg_sessiond;
1315 }
1316 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1317 {
1318 enum lttng_trace_chunk_command_type close_command =
1319 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1320 const uint64_t relayd_id =
1321 msg.u.close_trace_chunk.relayd_id.value;
1322 struct lttcomm_consumer_close_trace_chunk_reply reply;
1323 char path[LTTNG_PATH_MAX];
1324 ssize_t ret_send;
1325
1326 ret_code = lttng_consumer_close_trace_chunk(
1327 msg.u.close_trace_chunk.relayd_id.is_set ?
1328 &relayd_id :
1329 NULL,
1330 msg.u.close_trace_chunk.session_id,
1331 msg.u.close_trace_chunk.chunk_id,
1332 (time_t) msg.u.close_trace_chunk.close_timestamp,
1333 msg.u.close_trace_chunk.close_command.is_set ?
1334 &close_command :
1335 NULL, path);
1336 reply.ret_code = ret_code;
1337 reply.path_length = strlen(path) + 1;
1338 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1339 if (ret_send != sizeof(reply)) {
1340 goto error_fatal;
1341 }
1342 ret_send = lttcomm_send_unix_sock(
1343 sock, path, reply.path_length);
1344 if (ret_send != reply.path_length) {
1345 goto error_fatal;
1346 }
1347 goto end_nosignal;
1348 }
1349 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1350 {
1351 const uint64_t relayd_id =
1352 msg.u.trace_chunk_exists.relayd_id.value;
1353
1354 ret_code = lttng_consumer_trace_chunk_exists(
1355 msg.u.trace_chunk_exists.relayd_id.is_set ?
1356 &relayd_id : NULL,
1357 msg.u.trace_chunk_exists.session_id,
1358 msg.u.trace_chunk_exists.chunk_id);
1359 goto end_msg_sessiond;
1360 }
1361 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1362 {
1363 const uint64_t key = msg.u.open_channel_packets.key;
1364 struct lttng_consumer_channel *channel =
1365 consumer_find_channel(key);
1366
1367 if (channel) {
1368 pthread_mutex_lock(&channel->lock);
1369 ret_code = lttng_consumer_open_channel_packets(channel);
1370 pthread_mutex_unlock(&channel->lock);
1371 } else {
1372 WARN("Channel %" PRIu64 " not found", key);
1373 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1374 }
1375
1376 health_code_update();
1377 goto end_msg_sessiond;
1378 }
1379 default:
1380 goto end_nosignal;
1381 }
1382
1383 end_nosignal:
1384 /*
1385 * Return 1 to indicate success since the 0 value can be a socket
1386 * shutdown during the recv() or send() call.
1387 */
1388 ret_func = 1;
1389 goto end;
1390 error_fatal:
1391 /* This will issue a consumer stop. */
1392 ret_func = -1;
1393 goto end;
1394 end_msg_sessiond:
1395 /*
1396 * The returned value here is not useful since either way we'll return 1 to
1397 * the caller because the session daemon socket management is done
1398 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1399 */
1400 {
1401 int ret_send_status;
1402
1403 ret_send_status = consumer_send_status_msg(sock, ret_code);
1404 if (ret_send_status < 0) {
1405 goto error_fatal;
1406 }
1407 }
1408
1409 ret_func = 1;
1410
1411 end:
1412 health_code_update();
1413 rcu_read_unlock();
1414 return ret_func;
1415 }
1416
1417 /*
1418 * Sync metadata meaning request them to the session daemon and snapshot to the
1419 * metadata thread can consumer them.
1420 *
1421 * Metadata stream lock MUST be acquired.
1422 */
1423 enum sync_metadata_status lttng_kconsumer_sync_metadata(
1424 struct lttng_consumer_stream *metadata)
1425 {
1426 int ret;
1427 enum sync_metadata_status status;
1428
1429 LTTNG_ASSERT(metadata);
1430
1431 ret = kernctl_buffer_flush(metadata->wait_fd);
1432 if (ret < 0) {
1433 ERR("Failed to flush kernel stream");
1434 status = SYNC_METADATA_STATUS_ERROR;
1435 goto end;
1436 }
1437
1438 ret = kernctl_snapshot(metadata->wait_fd);
1439 if (ret < 0) {
1440 if (errno == EAGAIN) {
1441 /* No new metadata, exit. */
1442 DBG("Sync metadata, no new kernel metadata");
1443 status = SYNC_METADATA_STATUS_NO_DATA;
1444 } else {
1445 ERR("Sync metadata, taking kernel snapshot failed.");
1446 status = SYNC_METADATA_STATUS_ERROR;
1447 }
1448 } else {
1449 status = SYNC_METADATA_STATUS_NEW_DATA;
1450 }
1451
1452 end:
1453 return status;
1454 }
1455
1456 static
1457 int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1458 struct stream_subbuffer *subbuf)
1459 {
1460 int ret;
1461
1462 ret = kernctl_get_subbuf_size(
1463 stream->wait_fd, &subbuf->info.data.subbuf_size);
1464 if (ret) {
1465 goto end;
1466 }
1467
1468 ret = kernctl_get_padded_subbuf_size(
1469 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1470 if (ret) {
1471 goto end;
1472 }
1473
1474 end:
1475 return ret;
1476 }
1477
1478 static
1479 int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1480 struct stream_subbuffer *subbuf)
1481 {
1482 int ret;
1483
1484 ret = extract_common_subbuffer_info(stream, subbuf);
1485 if (ret) {
1486 goto end;
1487 }
1488
1489 ret = kernctl_get_metadata_version(
1490 stream->wait_fd, &subbuf->info.metadata.version);
1491 if (ret) {
1492 goto end;
1493 }
1494
1495 end:
1496 return ret;
1497 }
1498
1499 static
1500 int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1501 struct stream_subbuffer *subbuf)
1502 {
1503 int ret;
1504
1505 ret = extract_common_subbuffer_info(stream, subbuf);
1506 if (ret) {
1507 goto end;
1508 }
1509
1510 ret = kernctl_get_packet_size(
1511 stream->wait_fd, &subbuf->info.data.packet_size);
1512 if (ret < 0) {
1513 PERROR("Failed to get sub-buffer packet size");
1514 goto end;
1515 }
1516
1517 ret = kernctl_get_content_size(
1518 stream->wait_fd, &subbuf->info.data.content_size);
1519 if (ret < 0) {
1520 PERROR("Failed to get sub-buffer content size");
1521 goto end;
1522 }
1523
1524 ret = kernctl_get_timestamp_begin(
1525 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1526 if (ret < 0) {
1527 PERROR("Failed to get sub-buffer begin timestamp");
1528 goto end;
1529 }
1530
1531 ret = kernctl_get_timestamp_end(
1532 stream->wait_fd, &subbuf->info.data.timestamp_end);
1533 if (ret < 0) {
1534 PERROR("Failed to get sub-buffer end timestamp");
1535 goto end;
1536 }
1537
1538 ret = kernctl_get_events_discarded(
1539 stream->wait_fd, &subbuf->info.data.events_discarded);
1540 if (ret) {
1541 PERROR("Failed to get sub-buffer events discarded count");
1542 goto end;
1543 }
1544
1545 ret = kernctl_get_sequence_number(stream->wait_fd,
1546 &subbuf->info.data.sequence_number.value);
1547 if (ret) {
1548 /* May not be supported by older LTTng-modules. */
1549 if (ret != -ENOTTY) {
1550 PERROR("Failed to get sub-buffer sequence number");
1551 goto end;
1552 }
1553 } else {
1554 subbuf->info.data.sequence_number.is_set = true;
1555 }
1556
1557 ret = kernctl_get_stream_id(
1558 stream->wait_fd, &subbuf->info.data.stream_id);
1559 if (ret < 0) {
1560 PERROR("Failed to get stream id");
1561 goto end;
1562 }
1563
1564 ret = kernctl_get_instance_id(stream->wait_fd,
1565 &subbuf->info.data.stream_instance_id.value);
1566 if (ret) {
1567 /* May not be supported by older LTTng-modules. */
1568 if (ret != -ENOTTY) {
1569 PERROR("Failed to get stream instance id");
1570 goto end;
1571 }
1572 } else {
1573 subbuf->info.data.stream_instance_id.is_set = true;
1574 }
1575 end:
1576 return ret;
1577 }
1578
1579 static
1580 enum get_next_subbuffer_status get_subbuffer_common(
1581 struct lttng_consumer_stream *stream,
1582 struct stream_subbuffer *subbuffer)
1583 {
1584 int ret;
1585 enum get_next_subbuffer_status status;
1586
1587 ret = kernctl_get_next_subbuf(stream->wait_fd);
1588 switch (ret) {
1589 case 0:
1590 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1591 break;
1592 case -ENODATA:
1593 case -EAGAIN:
1594 /*
1595 * The caller only expects -ENODATA when there is no data to
1596 * read, but the kernel tracer returns -EAGAIN when there is
1597 * currently no data for a non-finalized stream, and -ENODATA
1598 * when there is no data for a finalized stream. Those can be
1599 * combined into a -ENODATA return value.
1600 */
1601 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1602 goto end;
1603 default:
1604 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1605 goto end;
1606 }
1607
1608 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1609 stream, subbuffer);
1610 if (ret) {
1611 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1612 }
1613 end:
1614 return status;
1615 }
1616
1617 static
1618 enum get_next_subbuffer_status get_next_subbuffer_splice(
1619 struct lttng_consumer_stream *stream,
1620 struct stream_subbuffer *subbuffer)
1621 {
1622 const enum get_next_subbuffer_status status =
1623 get_subbuffer_common(stream, subbuffer);
1624
1625 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1626 goto end;
1627 }
1628
1629 subbuffer->buffer.fd = stream->wait_fd;
1630 end:
1631 return status;
1632 }
1633
1634 static
1635 enum get_next_subbuffer_status get_next_subbuffer_mmap(
1636 struct lttng_consumer_stream *stream,
1637 struct stream_subbuffer *subbuffer)
1638 {
1639 int ret;
1640 enum get_next_subbuffer_status status;
1641 const char *addr;
1642
1643 status = get_subbuffer_common(stream, subbuffer);
1644 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1645 goto end;
1646 }
1647
1648 ret = get_current_subbuf_addr(stream, &addr);
1649 if (ret) {
1650 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1651 goto end;
1652 }
1653
1654 subbuffer->buffer.buffer = lttng_buffer_view_init(
1655 addr, 0, subbuffer->info.data.padded_subbuf_size);
1656 end:
1657 return status;
1658 }
1659
1660 static
1661 enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1662 struct stream_subbuffer *subbuffer)
1663 {
1664 int ret;
1665 const char *addr;
1666 bool coherent;
1667 enum get_next_subbuffer_status status;
1668
1669 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1670 &coherent);
1671 if (ret) {
1672 goto end;
1673 }
1674
1675 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1676 stream, subbuffer);
1677 if (ret) {
1678 goto end;
1679 }
1680
1681 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1682
1683 ret = get_current_subbuf_addr(stream, &addr);
1684 if (ret) {
1685 goto end;
1686 }
1687
1688 subbuffer->buffer.buffer = lttng_buffer_view_init(
1689 addr, 0, subbuffer->info.data.padded_subbuf_size);
1690 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1691 subbuffer->info.metadata.padded_subbuf_size,
1692 coherent ? "true" : "false");
1693 end:
1694 /*
1695 * The caller only expects -ENODATA when there is no data to read, but
1696 * the kernel tracer returns -EAGAIN when there is currently no data
1697 * for a non-finalized stream, and -ENODATA when there is no data for a
1698 * finalized stream. Those can be combined into a -ENODATA return value.
1699 */
1700 switch (ret) {
1701 case 0:
1702 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1703 break;
1704 case -ENODATA:
1705 case -EAGAIN:
1706 /*
1707 * The caller only expects -ENODATA when there is no data to
1708 * read, but the kernel tracer returns -EAGAIN when there is
1709 * currently no data for a non-finalized stream, and -ENODATA
1710 * when there is no data for a finalized stream. Those can be
1711 * combined into a -ENODATA return value.
1712 */
1713 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1714 break;
1715 default:
1716 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1717 break;
1718 }
1719
1720 return status;
1721 }
1722
1723 static
1724 int put_next_subbuffer(struct lttng_consumer_stream *stream,
1725 struct stream_subbuffer *subbuffer __attribute__((unused)))
1726 {
1727 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1728
1729 if (ret) {
1730 if (ret == -EFAULT) {
1731 PERROR("Error in unreserving sub buffer");
1732 } else if (ret == -EIO) {
1733 /* Should never happen with newer LTTng versions */
1734 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1735 }
1736 }
1737
1738 return ret;
1739 }
1740
1741 static
1742 bool is_get_next_check_metadata_available(int tracer_fd)
1743 {
1744 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1745 const bool available = ret != -ENOTTY;
1746
1747 if (ret == 0) {
1748 /* get succeeded, make sure to put the subbuffer. */
1749 kernctl_put_subbuf(tracer_fd);
1750 }
1751
1752 return available;
1753 }
1754
1755 static
1756 int signal_metadata(struct lttng_consumer_stream *stream,
1757 struct lttng_consumer_local_data *ctx __attribute__((unused)))
1758 {
1759 ASSERT_LOCKED(stream->metadata_rdv_lock);
1760 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1761 }
1762
1763 static
1764 int lttng_kconsumer_set_stream_ops(
1765 struct lttng_consumer_stream *stream)
1766 {
1767 int ret = 0;
1768
1769 if (stream->metadata_flag && stream->chan->is_live) {
1770 DBG("Attempting to enable metadata bucketization for live consumers");
1771 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1772 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1773 stream->read_subbuffer_ops.get_next_subbuffer =
1774 get_next_subbuffer_metadata_check;
1775 ret = consumer_stream_enable_metadata_bucketization(
1776 stream);
1777 if (ret) {
1778 goto end;
1779 }
1780 } else {
1781 /*
1782 * The kernel tracer version is too old to indicate
1783 * when the metadata stream has reached a "coherent"
1784 * (parseable) point.
1785 *
1786 * This means that a live viewer may see an incoherent
1787 * sequence of metadata and fail to parse it.
1788 */
1789 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1790 metadata_bucket_destroy(stream->metadata_bucket);
1791 stream->metadata_bucket = NULL;
1792 }
1793
1794 stream->read_subbuffer_ops.on_sleep = signal_metadata;
1795 }
1796
1797 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1798 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1799 stream->read_subbuffer_ops.get_next_subbuffer =
1800 get_next_subbuffer_mmap;
1801 } else {
1802 stream->read_subbuffer_ops.get_next_subbuffer =
1803 get_next_subbuffer_splice;
1804 }
1805 }
1806
1807 if (stream->metadata_flag) {
1808 stream->read_subbuffer_ops.extract_subbuffer_info =
1809 extract_metadata_subbuffer_info;
1810 } else {
1811 stream->read_subbuffer_ops.extract_subbuffer_info =
1812 extract_data_subbuffer_info;
1813 if (stream->chan->is_live) {
1814 stream->read_subbuffer_ops.send_live_beacon =
1815 consumer_flush_kernel_index;
1816 }
1817 }
1818
1819 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
1820 end:
1821 return ret;
1822 }
1823
1824 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1825 {
1826 int ret;
1827
1828 LTTNG_ASSERT(stream);
1829
1830 /*
1831 * Don't create anything if this is set for streaming or if there is
1832 * no current trace chunk on the parent channel.
1833 */
1834 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1835 stream->chan->trace_chunk) {
1836 ret = consumer_stream_create_output_files(stream, true);
1837 if (ret) {
1838 goto error;
1839 }
1840 }
1841
1842 if (stream->output == LTTNG_EVENT_MMAP) {
1843 /* get the len of the mmap region */
1844 unsigned long mmap_len;
1845
1846 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1847 if (ret != 0) {
1848 PERROR("kernctl_get_mmap_len");
1849 goto error_close_fd;
1850 }
1851 stream->mmap_len = (size_t) mmap_len;
1852
1853 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1854 MAP_PRIVATE, stream->wait_fd, 0);
1855 if (stream->mmap_base == MAP_FAILED) {
1856 PERROR("Error mmaping");
1857 ret = -1;
1858 goto error_close_fd;
1859 }
1860 }
1861
1862 ret = lttng_kconsumer_set_stream_ops(stream);
1863 if (ret) {
1864 goto error_close_fd;
1865 }
1866
1867 /* we return 0 to let the library handle the FD internally */
1868 return 0;
1869
1870 error_close_fd:
1871 if (stream->out_fd >= 0) {
1872 int err;
1873
1874 err = close(stream->out_fd);
1875 LTTNG_ASSERT(!err);
1876 stream->out_fd = -1;
1877 }
1878 error:
1879 return ret;
1880 }
1881
1882 /*
1883 * Check if data is still being extracted from the buffers for a specific
1884 * stream. Consumer data lock MUST be acquired before calling this function
1885 * and the stream lock.
1886 *
1887 * Return 1 if the traced data are still getting read else 0 meaning that the
1888 * data is available for trace viewer reading.
1889 */
1890 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1891 {
1892 int ret;
1893
1894 LTTNG_ASSERT(stream);
1895
1896 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1897 ret = 0;
1898 goto end;
1899 }
1900
1901 ret = kernctl_get_next_subbuf(stream->wait_fd);
1902 if (ret == 0) {
1903 /* There is still data so let's put back this subbuffer. */
1904 ret = kernctl_put_subbuf(stream->wait_fd);
1905 LTTNG_ASSERT(ret == 0);
1906 ret = 1; /* Data is pending */
1907 goto end;
1908 }
1909
1910 /* Data is NOT pending and ready to be read. */
1911 ret = 0;
1912
1913 end:
1914 return ret;
1915 }
This page took 0.072964 seconds and 5 git commands to generate.