Propagate trace format all the way to the consumer
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
3bd1e081
MD
11#include <poll.h>
12#include <pthread.h>
13#include <stdlib.h>
14#include <string.h>
15#include <sys/mman.h>
16#include <sys/socket.h>
17#include <sys/types.h>
77c7c900 18#include <inttypes.h>
3bd1e081 19#include <unistd.h>
dbb5dfe6 20#include <sys/stat.h>
f5ba75b4 21#include <stdint.h>
3bd1e081 22
c9e313bc
SM
23#include <bin/lttng-consumerd/health-consumerd.hpp>
24#include <common/common.hpp>
25#include <common/kernel-ctl/kernel-ctl.hpp>
26#include <common/sessiond-comm/sessiond-comm.hpp>
27#include <common/sessiond-comm/relayd.hpp>
28#include <common/compat/fcntl.hpp>
29#include <common/compat/endian.hpp>
30#include <common/pipe.hpp>
31#include <common/relayd/relayd.hpp>
32#include <common/utils.hpp>
33#include <common/consumer/consumer-stream.hpp>
34#include <common/index/index.hpp>
35#include <common/consumer/consumer-timer.hpp>
36#include <common/optional.hpp>
37#include <common/buffer-view.hpp>
38#include <common/consumer/consumer.hpp>
39#include <common/consumer/metadata-bucket.hpp>
0857097f 40
c9e313bc 41#include "kernel-consumer.hpp"
3bd1e081 42
fa29bfbf 43extern struct lttng_consumer_global_data the_consumer_data;
3bd1e081 44extern int consumer_poll_timeout;
3bd1e081 45
3bd1e081
MD
46/*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
ffe60014 51int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
52{
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
d2d2f190
JD
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
5a510c9f 62 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
63 }
64
65 return ret;
66}
67
e9404c27
JG
68/*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75{
a0377dfe 76 LTTNG_ASSERT(stream);
e9404c27
JG
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79}
80
3bd1e081
MD
81/*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
ffe60014 86int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
87 unsigned long *pos)
88{
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
5a510c9f 94 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
95 }
96
97 return ret;
98}
99
07b86b52
JD
100/*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107{
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
5a510c9f 113 PERROR("kernctl_snapshot_get_consumed");
07b86b52
JD
114 }
115
116 return ret;
117}
118
128708c3
JG
119static
120int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122{
123 int ret;
124 unsigned long mmap_offset;
97535efa 125 const char *mmap_base = (const char *) stream->mmap_base;
128708c3
JG
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134error:
135 return ret;
136}
137
07b86b52
JD
138/*
139 * Take a snapshot of all the stream of a channel
3eb928aa 140 * RCU read-side lock must be held across this function to ensure existence of
947bd097 141 * channel.
07b86b52
JD
142 *
143 * Returns 0 on success, < 0 on error
144 */
f72bb42f
JG
145static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
f46376a1 148 uint64_t nb_packets_per_stream)
07b86b52
JD
149{
150 int ret;
07b86b52
JD
151 struct lttng_consumer_stream *stream;
152
6a00837f 153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
07b86b52 154
947bd097
JR
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
07b86b52
JD
158 rcu_read_lock();
159
07b86b52
JD
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
9381314c
JG
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
07b86b52
JD
164 ret = -1;
165 goto end;
166 }
167
10a50311 168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
923333cd 169 unsigned long consumed_pos, produced_pos;
9ce5646a
MD
170
171 health_code_update();
172
07b86b52
JD
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
a0377dfe 178 LTTNG_ASSERT(channel->trace_chunk);
d2956687
JG
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
a0377dfe 188 LTTNG_ASSERT(!stream->trace_chunk);
d2956687
JG
189 stream->trace_chunk = channel->trace_chunk;
190
29decac3
DG
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
07b86b52
JD
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
10a50311 198 ret = consumer_send_relayd_stream(stream, path);
07b86b52
JD
199 if (ret < 0) {
200 ERR("sending stream to relayd");
201 goto end_unlock;
202 }
07b86b52 203 } else {
d2956687
JG
204 ret = consumer_stream_create_output_files(stream,
205 false);
07b86b52 206 if (ret < 0) {
07b86b52
JD
207 goto end_unlock;
208 }
d2956687
JG
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
07b86b52
JD
211 }
212
f22dd891 213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
07b86b52 214 if (ret < 0) {
f22dd891
MD
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
225 goto end_unlock;
226 }
07b86b52
JD
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
245 goto end_unlock;
246 }
247
d07ceecd
MD
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
5c786ded 251
9377d830 252 while ((long) (consumed_pos - produced_pos) < 0) {
07b86b52
JD
253 ssize_t read_len;
254 unsigned long len, padded_len;
128708c3 255 const char *subbuf_addr;
fd424d99 256 struct lttng_buffer_view subbuf_view;
07b86b52 257
9ce5646a 258 health_code_update();
07b86b52
JD
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
32af2c95 263 if (ret != -EAGAIN) {
07b86b52
JD
264 PERROR("kernctl_get_subbuf snapshot");
265 goto end_unlock;
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
ddc93ee4 269 stream->chan->lost_packets++;
07b86b52
JD
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
29decac3 276 goto error_put_subbuf;
07b86b52
JD
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
29decac3 282 goto error_put_subbuf;
07b86b52
JD
283 }
284
128708c3
JG
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
fd424d99
JG
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
f5ba75b4 292 read_len = lttng_consumer_on_read_subbuffer_mmap(
fd424d99 293 stream, &subbuf_view,
6f9449c2 294 padded_len - len);
07b86b52 295 /*
29decac3
DG
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
07b86b52
JD
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
315 goto end_unlock;
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
320 if (relayd_id == (uint64_t) -1ULL) {
fdf9986c
MD
321 if (stream->out_fd >= 0) {
322 ret = close(stream->out_fd);
323 if (ret < 0) {
324 PERROR("Kernel consumer snapshot close out_fd");
325 goto end_unlock;
326 }
327 stream->out_fd = -1;
07b86b52 328 }
07b86b52
JD
329 } else {
330 close_relayd_stream(stream);
331 stream->net_seq_idx = (uint64_t) -1ULL;
332 }
d2956687
JG
333 lttng_trace_chunk_put(stream->trace_chunk);
334 stream->trace_chunk = NULL;
07b86b52
JD
335 pthread_mutex_unlock(&stream->lock);
336 }
337
338 /* All good! */
339 ret = 0;
340 goto end;
341
29decac3
DG
342error_put_subbuf:
343 ret = kernctl_put_subbuf(stream->wait_fd);
344 if (ret < 0) {
345 ERR("Snapshot kernctl_put_subbuf error path");
346 }
07b86b52
JD
347end_unlock:
348 pthread_mutex_unlock(&stream->lock);
349end:
350 rcu_read_unlock();
947bd097 351 pthread_mutex_unlock(&channel->lock);
07b86b52
JD
352 return ret;
353}
354
355/*
356 * Read the whole metadata available for a snapshot.
3eb928aa 357 * RCU read-side lock must be held across this function to ensure existence of
947bd097 358 * metadata_channel.
07b86b52
JD
359 *
360 * Returns 0 on success, < 0 on error
361 */
d2956687
JG
362static int lttng_kconsumer_snapshot_metadata(
363 struct lttng_consumer_channel *metadata_channel,
3eb928aa
MD
364 uint64_t key, char *path, uint64_t relayd_id,
365 struct lttng_consumer_local_data *ctx)
07b86b52 366{
d771f832
DG
367 int ret, use_relayd = 0;
368 ssize_t ret_read;
07b86b52 369 struct lttng_consumer_stream *metadata_stream;
d771f832 370
a0377dfe 371 LTTNG_ASSERT(ctx);
07b86b52
JD
372
373 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
374 key, path);
375
376 rcu_read_lock();
377
07b86b52 378 metadata_stream = metadata_channel->metadata_stream;
a0377dfe 379 LTTNG_ASSERT(metadata_stream);
d2956687 380
947bd097 381 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
a0377dfe
FD
382 LTTNG_ASSERT(metadata_channel->trace_chunk);
383 LTTNG_ASSERT(metadata_stream->trace_chunk);
07b86b52 384
d771f832 385 /* Flag once that we have a valid relayd for the stream. */
e2039c7a 386 if (relayd_id != (uint64_t) -1ULL) {
d771f832
DG
387 use_relayd = 1;
388 }
389
390 if (use_relayd) {
10a50311 391 ret = consumer_send_relayd_stream(metadata_stream, path);
e2039c7a 392 if (ret < 0) {
fa27abe8 393 goto error_snapshot;
e2039c7a 394 }
e2039c7a 395 } else {
d2956687
JG
396 ret = consumer_stream_create_output_files(metadata_stream,
397 false);
e2039c7a 398 if (ret < 0) {
fa27abe8 399 goto error_snapshot;
e2039c7a 400 }
07b86b52 401 }
07b86b52 402
d771f832 403 do {
9ce5646a
MD
404 health_code_update();
405
6f9449c2 406 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
d771f832 407 if (ret_read < 0) {
6e5e3c51
MD
408 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
409 ret_read);
410 ret = ret_read;
411 goto error_snapshot;
07b86b52 412 }
6e5e3c51 413 } while (ret_read > 0);
07b86b52 414
d771f832
DG
415 if (use_relayd) {
416 close_relayd_stream(metadata_stream);
417 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
418 } else {
fdf9986c
MD
419 if (metadata_stream->out_fd >= 0) {
420 ret = close(metadata_stream->out_fd);
421 if (ret < 0) {
422 PERROR("Kernel consumer snapshot metadata close out_fd");
423 /*
424 * Don't go on error here since the snapshot was successful at this
425 * point but somehow the close failed.
426 */
427 }
428 metadata_stream->out_fd = -1;
d2956687
JG
429 lttng_trace_chunk_put(metadata_stream->trace_chunk);
430 metadata_stream->trace_chunk = NULL;
e2039c7a 431 }
e2039c7a
JD
432 }
433
07b86b52 434 ret = 0;
fa27abe8 435error_snapshot:
947bd097 436 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
cf53a8a6
JD
437 consumer_stream_destroy(metadata_stream, NULL);
438 metadata_channel->metadata_stream = NULL;
07b86b52
JD
439 rcu_read_unlock();
440 return ret;
441}
442
1803a064
MD
443/*
444 * Receive command from session daemon and process it.
445 *
446 * Return 1 on success else a negative value or 0.
447 */
3bd1e081
MD
448int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
449 int sock, struct pollfd *consumer_sockpoll)
450{
0c5b3718 451 int ret_func;
0c759fc9 452 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081
MD
453 struct lttcomm_consumer_msg msg;
454
9ce5646a
MD
455 health_code_update();
456
0c5b3718
SM
457 {
458 ssize_t ret_recv;
459
460 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
461 if (ret_recv != sizeof(msg)) {
462 if (ret_recv > 0) {
463 lttng_consumer_send_error(ctx,
464 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
465 ret_recv = -1;
466 }
467 return ret_recv;
1803a064 468 }
3bd1e081 469 }
9ce5646a
MD
470
471 health_code_update();
472
84382d49 473 /* Deprecated command */
a0377dfe 474 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 475
9ce5646a
MD
476 health_code_update();
477
b0b335c8
MD
478 /* relayd needs RCU read-side protection */
479 rcu_read_lock();
480
3bd1e081 481 switch (msg.cmd_type) {
00e2e675
DG
482 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
483 {
4222116f
JR
484 uint32_t major = msg.u.relayd_sock.major;
485 uint32_t minor = msg.u.relayd_sock.minor;
486 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
487 msg.u.relayd_sock.relayd_socket_protocol;
488
f50f23d9 489 /* Session daemon status message are handled in the following call. */
2527bf85 490 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
4222116f
JR
491 msg.u.relayd_sock.type, ctx, sock,
492 consumer_sockpoll, msg.u.relayd_sock.session_id,
493 msg.u.relayd_sock.relayd_session_id, major,
494 minor, protocol);
00e2e675
DG
495 goto end_nosignal;
496 }
3bd1e081
MD
497 case LTTNG_CONSUMER_ADD_CHANNEL:
498 {
499 struct lttng_consumer_channel *new_channel;
afbf29db 500 int ret_send_status, ret_add_channel = 0;
d2956687 501 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
3bd1e081 502
9ce5646a
MD
503 health_code_update();
504
f50f23d9 505 /* First send a status message before receiving the fds. */
0c5b3718
SM
506 ret_send_status = consumer_send_status_msg(sock, ret_code);
507 if (ret_send_status < 0) {
f50f23d9 508 /* Somehow, the session daemon is not responding anymore. */
1803a064 509 goto error_fatal;
f50f23d9 510 }
9ce5646a
MD
511
512 health_code_update();
513
d88aee68 514 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
3bd1e081 515 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
d2956687 516 msg.u.channel.session_id,
8696b40b
JR
517 msg.u.channel.chunk_id.is_set ? &chunk_id : NULL,
518 msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.relayd_id,
519 msg.u.channel.output, msg.u.channel.tracefile_size,
520 msg.u.channel.tracefile_count, 0, msg.u.channel.monitor,
521 msg.u.channel.live_timer_interval, msg.u.channel.is_live, NULL,
522 NULL, msg.u.channel.trace_format);
3bd1e081 523 if (new_channel == NULL) {
f73fabfd 524 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
3bd1e081
MD
525 goto end_nosignal;
526 }
ffe60014 527 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
95a1109b
JD
528 switch (msg.u.channel.output) {
529 case LTTNG_EVENT_SPLICE:
530 new_channel->output = CONSUMER_CHANNEL_SPLICE;
531 break;
532 case LTTNG_EVENT_MMAP:
533 new_channel->output = CONSUMER_CHANNEL_MMAP;
534 break;
535 default:
536 ERR("Channel output unknown %d", msg.u.channel.output);
537 goto end_nosignal;
538 }
ffe60014
DG
539
540 /* Translate and save channel type. */
541 switch (msg.u.channel.type) {
542 case CONSUMER_CHANNEL_TYPE_DATA:
543 case CONSUMER_CHANNEL_TYPE_METADATA:
97535efa 544 new_channel->type = (consumer_channel_type) msg.u.channel.type;
ffe60014
DG
545 break;
546 default:
a0377dfe 547 abort();
ffe60014
DG
548 goto end_nosignal;
549 };
550
9ce5646a
MD
551 health_code_update();
552
3bd1e081 553 if (ctx->on_recv_channel != NULL) {
0c5b3718
SM
554 int ret_recv_channel =
555 ctx->on_recv_channel(new_channel);
556 if (ret_recv_channel == 0) {
557 ret_add_channel = consumer_add_channel(
558 new_channel, ctx);
559 } else if (ret_recv_channel < 0) {
3bd1e081
MD
560 goto end_nosignal;
561 }
562 } else {
0c5b3718
SM
563 ret_add_channel =
564 consumer_add_channel(new_channel, ctx);
3bd1e081 565 }
0c5b3718
SM
566 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
567 !ret_add_channel) {
e9404c27
JG
568 int monitor_start_ret;
569
570 DBG("Consumer starting monitor timer");
94d49140
JD
571 consumer_timer_live_start(new_channel,
572 msg.u.channel.live_timer_interval);
e9404c27
JG
573 monitor_start_ret = consumer_timer_monitor_start(
574 new_channel,
575 msg.u.channel.monitor_timer_interval);
576 if (monitor_start_ret < 0) {
577 ERR("Starting channel monitoring timer failed");
578 goto end_nosignal;
579 }
94d49140 580 }
e43c41c5 581
9ce5646a
MD
582 health_code_update();
583
e43c41c5 584 /* If we received an error in add_channel, we need to report it. */
0c5b3718
SM
585 if (ret_add_channel < 0) {
586 ret_send_status = consumer_send_status_msg(
587 sock, ret_add_channel);
588 if (ret_send_status < 0) {
1803a064
MD
589 goto error_fatal;
590 }
e43c41c5
JD
591 goto end_nosignal;
592 }
593
3bd1e081
MD
594 goto end_nosignal;
595 }
596 case LTTNG_CONSUMER_ADD_STREAM:
597 {
dae10966
DG
598 int fd;
599 struct lttng_pipe *stream_pipe;
00e2e675 600 struct lttng_consumer_stream *new_stream;
ffe60014 601 struct lttng_consumer_channel *channel;
c80048c6 602 int alloc_ret = 0;
0c5b3718
SM
603 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
604 ssize_t ret_pipe_write, ret_recv;
3bd1e081 605
ffe60014
DG
606 /*
607 * Get stream's channel reference. Needed when adding the stream to the
608 * global hash table.
609 */
610 channel = consumer_find_channel(msg.u.stream.channel_key);
611 if (!channel) {
612 /*
613 * We could not find the channel. Can happen if cpu hotplug
614 * happens while tearing down.
615 */
d88aee68 616 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
e462382a 617 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
618 }
619
9ce5646a
MD
620 health_code_update();
621
f50f23d9 622 /* First send a status message before receiving the fds. */
0c5b3718
SM
623 ret_send_status = consumer_send_status_msg(sock, ret_code);
624 if (ret_send_status < 0) {
d771f832 625 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 626 goto error_add_stream_fatal;
1803a064 627 }
9ce5646a
MD
628
629 health_code_update();
630
0c759fc9 631 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
d771f832 632 /* Channel was not found. */
c5c7998f 633 goto error_add_stream_nosignal;
f50f23d9
DG
634 }
635
d771f832 636 /* Blocking call */
9ce5646a 637 health_poll_entry();
0c5b3718 638 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
9ce5646a 639 health_poll_exit();
0c5b3718 640 if (ret_poll) {
c5c7998f 641 goto error_add_stream_fatal;
3bd1e081 642 }
00e2e675 643
9ce5646a
MD
644 health_code_update();
645
00e2e675 646 /* Get stream file descriptor from socket */
0c5b3718
SM
647 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
648 if (ret_recv != sizeof(fd)) {
f73fabfd 649 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
0c5b3718 650 ret_func = ret_recv;
c5c7998f 651 goto end;
3bd1e081 652 }
3bd1e081 653
9ce5646a
MD
654 health_code_update();
655
f50f23d9
DG
656 /*
657 * Send status code to session daemon only if the recv works. If the
658 * above recv() failed, the session daemon is notified through the
659 * error socket and the teardown is eventually done.
660 */
0c5b3718
SM
661 ret_send_status = consumer_send_status_msg(sock, ret_code);
662 if (ret_send_status < 0) {
f50f23d9 663 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 664 goto error_add_stream_nosignal;
f50f23d9
DG
665 }
666
9ce5646a
MD
667 health_code_update();
668
d2956687 669 pthread_mutex_lock(&channel->lock);
8696b40b
JR
670 new_stream = consumer_stream_create(channel, channel->key, fd, channel->name,
671 channel->relayd_id, channel->session_id, channel->trace_chunk,
672 msg.u.stream.cpu, &alloc_ret, channel->type, channel->monitor,
673 channel->trace_format);
3bd1e081 674 if (new_stream == NULL) {
c80048c6
MD
675 switch (alloc_ret) {
676 case -ENOMEM:
677 case -EINVAL:
678 default:
679 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
680 break;
c80048c6 681 }
d2956687 682 pthread_mutex_unlock(&channel->lock);
c5c7998f 683 goto error_add_stream_nosignal;
3bd1e081 684 }
d771f832 685
ffe60014 686 new_stream->wait_fd = fd;
0c5b3718
SM
687 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
688 new_stream->wait_fd, &new_stream->max_sb_size);
689 if (ret_get_max_subbuf_size < 0) {
d05185fa
JG
690 pthread_mutex_unlock(&channel->lock);
691 ERR("Failed to get kernel maximal subbuffer size");
c5c7998f 692 goto error_add_stream_nosignal;
d05185fa
JG
693 }
694
d9a2e16e
JD
695 consumer_stream_update_channel_attributes(new_stream,
696 channel);
00e2e675 697
a0c83db9
DG
698 /*
699 * We've just assigned the channel to the stream so increment the
07b86b52
JD
700 * refcount right now. We don't need to increment the refcount for
701 * streams in no monitor because we handle manually the cleanup of
702 * those. It is very important to make sure there is NO prior
703 * consumer_del_stream() calls or else the refcount will be unbalanced.
a0c83db9 704 */
07b86b52
JD
705 if (channel->monitor) {
706 uatomic_inc(&new_stream->chan->refcount);
707 }
9d9353f9 708
fb3a43a9
DG
709 /*
710 * The buffer flush is done on the session daemon side for the kernel
711 * so no need for the stream "hangup_flush_done" variable to be
712 * tracked. This is important for a kernel stream since we don't rely
713 * on the flush state of the stream to read data. It's not the case for
714 * user space tracing.
715 */
716 new_stream->hangup_flush_done = 0;
717
9ce5646a
MD
718 health_code_update();
719
d2956687 720 pthread_mutex_lock(&new_stream->lock);
633d0084 721 if (ctx->on_recv_stream) {
0c5b3718
SM
722 int ret_recv_stream = ctx->on_recv_stream(new_stream);
723 if (ret_recv_stream < 0) {
d2956687
JG
724 pthread_mutex_unlock(&new_stream->lock);
725 pthread_mutex_unlock(&channel->lock);
d771f832 726 consumer_stream_free(new_stream);
c5c7998f 727 goto error_add_stream_nosignal;
fb3a43a9 728 }
633d0084 729 }
9ce5646a
MD
730 health_code_update();
731
07b86b52
JD
732 if (new_stream->metadata_flag) {
733 channel->metadata_stream = new_stream;
734 }
735
2bba9e53
DG
736 /* Do not monitor this stream. */
737 if (!channel->monitor) {
5eecee74 738 DBG("Kernel consumer add stream %s in no monitor mode with "
6dc3064a 739 "relayd id %" PRIu64, new_stream->name,
5eecee74 740 new_stream->net_seq_idx);
10a50311 741 cds_list_add(&new_stream->send_node, &channel->streams.head);
d2956687
JG
742 pthread_mutex_unlock(&new_stream->lock);
743 pthread_mutex_unlock(&channel->lock);
c5c7998f 744 goto end_add_stream;
6dc3064a
DG
745 }
746
e1b71bdc
DG
747 /* Send stream to relayd if the stream has an ID. */
748 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
749 int ret_send_relayd_stream;
750
751 ret_send_relayd_stream = consumer_send_relayd_stream(
752 new_stream, new_stream->chan->pathname);
753 if (ret_send_relayd_stream < 0) {
d2956687
JG
754 pthread_mutex_unlock(&new_stream->lock);
755 pthread_mutex_unlock(&channel->lock);
e1b71bdc 756 consumer_stream_free(new_stream);
c5c7998f 757 goto error_add_stream_nosignal;
e1b71bdc 758 }
001b7e62
MD
759
760 /*
761 * If adding an extra stream to an already
762 * existing channel (e.g. cpu hotplug), we need
763 * to send the "streams_sent" command to relayd.
764 */
765 if (channel->streams_sent_to_relayd) {
0c5b3718
SM
766 int ret_send_relayd_streams_sent;
767
768 ret_send_relayd_streams_sent =
769 consumer_send_relayd_streams_sent(
770 new_stream->net_seq_idx);
771 if (ret_send_relayd_streams_sent < 0) {
d2956687
JG
772 pthread_mutex_unlock(&new_stream->lock);
773 pthread_mutex_unlock(&channel->lock);
c5c7998f 774 goto error_add_stream_nosignal;
001b7e62
MD
775 }
776 }
e2039c7a 777 }
d2956687
JG
778 pthread_mutex_unlock(&new_stream->lock);
779 pthread_mutex_unlock(&channel->lock);
e2039c7a 780
50f8ae69 781 /* Get the right pipe where the stream will be sent. */
633d0084 782 if (new_stream->metadata_flag) {
66d583dc 783 consumer_add_metadata_stream(new_stream);
dae10966 784 stream_pipe = ctx->consumer_metadata_pipe;
3bd1e081 785 } else {
66d583dc 786 consumer_add_data_stream(new_stream);
dae10966 787 stream_pipe = ctx->consumer_data_pipe;
50f8ae69
DG
788 }
789
66d583dc 790 /* Visible to other threads */
5ab66908
MD
791 new_stream->globally_visible = 1;
792
9ce5646a
MD
793 health_code_update();
794
0c5b3718
SM
795 ret_pipe_write = lttng_pipe_write(
796 stream_pipe, &new_stream, sizeof(new_stream));
797 if (ret_pipe_write < 0) {
dae10966 798 ERR("Consumer write %s stream to pipe %d",
50f8ae69 799 new_stream->metadata_flag ? "metadata" : "data",
dae10966 800 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
801 if (new_stream->metadata_flag) {
802 consumer_del_stream_for_metadata(new_stream);
803 } else {
804 consumer_del_stream_for_data(new_stream);
805 }
c5c7998f 806 goto error_add_stream_nosignal;
3bd1e081 807 }
00e2e675 808
02d02e31
JD
809 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
810 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
c5c7998f 811end_add_stream:
3bd1e081 812 break;
c5c7998f
JG
813error_add_stream_nosignal:
814 goto end_nosignal;
815error_add_stream_fatal:
816 goto error_fatal;
3bd1e081 817 }
a4baae1b
JD
818 case LTTNG_CONSUMER_STREAMS_SENT:
819 {
820 struct lttng_consumer_channel *channel;
0c5b3718 821 int ret_send_status;
a4baae1b
JD
822
823 /*
824 * Get stream's channel reference. Needed when adding the stream to the
825 * global hash table.
826 */
827 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
828 if (!channel) {
829 /*
830 * We could not find the channel. Can happen if cpu hotplug
831 * happens while tearing down.
832 */
833 ERR("Unable to find channel key %" PRIu64,
834 msg.u.sent_streams.channel_key);
e462382a 835 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
a4baae1b
JD
836 }
837
838 health_code_update();
839
840 /*
841 * Send status code to session daemon.
842 */
0c5b3718
SM
843 ret_send_status = consumer_send_status_msg(sock, ret_code);
844 if (ret_send_status < 0 ||
845 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
a4baae1b 846 /* Somehow, the session daemon is not responding anymore. */
80d5a658 847 goto error_streams_sent_nosignal;
a4baae1b
JD
848 }
849
850 health_code_update();
851
852 /*
853 * We should not send this message if we don't monitor the
854 * streams in this channel.
855 */
856 if (!channel->monitor) {
80d5a658 857 goto end_error_streams_sent;
a4baae1b
JD
858 }
859
860 health_code_update();
861 /* Send stream to relayd if the stream has an ID. */
862 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
863 int ret_send_relay_streams;
864
865 ret_send_relay_streams = consumer_send_relayd_streams_sent(
a4baae1b 866 msg.u.sent_streams.net_seq_idx);
0c5b3718 867 if (ret_send_relay_streams < 0) {
80d5a658 868 goto error_streams_sent_nosignal;
a4baae1b 869 }
001b7e62 870 channel->streams_sent_to_relayd = true;
a4baae1b 871 }
80d5a658 872end_error_streams_sent:
a4baae1b 873 break;
80d5a658
JG
874error_streams_sent_nosignal:
875 goto end_nosignal;
a4baae1b 876 }
3bd1e081
MD
877 case LTTNG_CONSUMER_UPDATE_STREAM:
878 {
3f8e211f
DG
879 rcu_read_unlock();
880 return -ENOSYS;
881 }
882 case LTTNG_CONSUMER_DESTROY_RELAYD:
883 {
a6ba4fe1 884 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
3f8e211f 885 struct consumer_relayd_sock_pair *relayd;
0c5b3718 886 int ret_send_status;
3f8e211f 887
a6ba4fe1 888 DBG("Kernel consumer destroying relayd %" PRIu64, index);
3f8e211f
DG
889
890 /* Get relayd reference if exists. */
a6ba4fe1 891 relayd = consumer_find_relayd(index);
3f8e211f 892 if (relayd == NULL) {
3448e266 893 DBG("Unable to find relayd %" PRIu64, index);
e462382a 894 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
3bd1e081 895 }
3f8e211f 896
a6ba4fe1
DG
897 /*
898 * Each relayd socket pair has a refcount of stream attached to it
899 * which tells if the relayd is still active or not depending on the
900 * refcount value.
901 *
902 * This will set the destroy flag of the relayd object and destroy it
903 * if the refcount reaches zero when called.
904 *
905 * The destroy can happen either here or when a stream fd hangs up.
906 */
f50f23d9
DG
907 if (relayd) {
908 consumer_flag_relayd_for_destroy(relayd);
909 }
910
9ce5646a
MD
911 health_code_update();
912
0c5b3718
SM
913 ret_send_status = consumer_send_status_msg(sock, ret_code);
914 if (ret_send_status < 0) {
f50f23d9 915 /* Somehow, the session daemon is not responding anymore. */
1803a064 916 goto error_fatal;
f50f23d9 917 }
3f8e211f 918
3f8e211f 919 goto end_nosignal;
3bd1e081 920 }
6d805429 921 case LTTNG_CONSUMER_DATA_PENDING:
53632229 922 {
0c5b3718 923 int32_t ret_data_pending;
6d805429 924 uint64_t id = msg.u.data_pending.session_id;
0c5b3718 925 ssize_t ret_send;
c8f59ee5 926
6d805429 927 DBG("Kernel consumer data pending command for id %" PRIu64, id);
c8f59ee5 928
0c5b3718 929 ret_data_pending = consumer_data_pending(id);
c8f59ee5 930
9ce5646a
MD
931 health_code_update();
932
c8f59ee5 933 /* Send back returned value to session daemon */
0c5b3718
SM
934 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
935 sizeof(ret_data_pending));
936 if (ret_send < 0) {
6d805429 937 PERROR("send data pending ret code");
1803a064 938 goto error_fatal;
c8f59ee5 939 }
f50f23d9
DG
940
941 /*
942 * No need to send back a status message since the data pending
943 * returned value is the response.
944 */
c8f59ee5 945 break;
53632229 946 }
6dc3064a
DG
947 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
948 {
3eb928aa
MD
949 struct lttng_consumer_channel *channel;
950 uint64_t key = msg.u.snapshot_channel.key;
0c5b3718 951 int ret_send_status;
3eb928aa
MD
952
953 channel = consumer_find_channel(key);
954 if (!channel) {
955 ERR("Channel %" PRIu64 " not found", key);
956 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52 957 } else {
3eb928aa 958 if (msg.u.snapshot_channel.metadata == 1) {
0c5b3718
SM
959 int ret_snapshot;
960
961 ret_snapshot = lttng_kconsumer_snapshot_metadata(
962 channel, key,
3eb928aa 963 msg.u.snapshot_channel.pathname,
0c5b3718
SM
964 msg.u.snapshot_channel.relayd_id,
965 ctx);
966 if (ret_snapshot < 0) {
3eb928aa
MD
967 ERR("Snapshot metadata failed");
968 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
969 }
970 } else {
0c5b3718
SM
971 int ret_snapshot;
972
973 ret_snapshot = lttng_kconsumer_snapshot_channel(
974 channel, key,
3eb928aa
MD
975 msg.u.snapshot_channel.pathname,
976 msg.u.snapshot_channel.relayd_id,
0c5b3718 977 msg.u.snapshot_channel
f46376a1 978 .nb_packets_per_stream);
0c5b3718 979 if (ret_snapshot < 0) {
3eb928aa
MD
980 ERR("Snapshot channel failed");
981 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
982 }
07b86b52
JD
983 }
984 }
9ce5646a
MD
985 health_code_update();
986
0c5b3718
SM
987 ret_send_status = consumer_send_status_msg(sock, ret_code);
988 if (ret_send_status < 0) {
6dc3064a
DG
989 /* Somehow, the session daemon is not responding anymore. */
990 goto end_nosignal;
991 }
992 break;
993 }
07b86b52
JD
994 case LTTNG_CONSUMER_DESTROY_CHANNEL:
995 {
996 uint64_t key = msg.u.destroy_channel.key;
997 struct lttng_consumer_channel *channel;
0c5b3718 998 int ret_send_status;
07b86b52
JD
999
1000 channel = consumer_find_channel(key);
1001 if (!channel) {
1002 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
e462382a 1003 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52
JD
1004 }
1005
9ce5646a
MD
1006 health_code_update();
1007
0c5b3718
SM
1008 ret_send_status = consumer_send_status_msg(sock, ret_code);
1009 if (ret_send_status < 0) {
07b86b52 1010 /* Somehow, the session daemon is not responding anymore. */
a9d36096 1011 goto end_destroy_channel;
07b86b52
JD
1012 }
1013
9ce5646a
MD
1014 health_code_update();
1015
15dc512a
DG
1016 /* Stop right now if no channel was found. */
1017 if (!channel) {
a9d36096 1018 goto end_destroy_channel;
15dc512a
DG
1019 }
1020
07b86b52
JD
1021 /*
1022 * This command should ONLY be issued for channel with streams set in
1023 * no monitor mode.
1024 */
a0377dfe 1025 LTTNG_ASSERT(!channel->monitor);
07b86b52
JD
1026
1027 /*
1028 * The refcount should ALWAYS be 0 in the case of a channel in no
1029 * monitor mode.
1030 */
a0377dfe 1031 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
07b86b52
JD
1032
1033 consumer_del_channel(channel);
a9d36096 1034end_destroy_channel:
07b86b52
JD
1035 goto end_nosignal;
1036 }
fb83fe64
JD
1037 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1038 {
66ab32be
JD
1039 ssize_t ret;
1040 uint64_t count;
fb83fe64
JD
1041 struct lttng_consumer_channel *channel;
1042 uint64_t id = msg.u.discarded_events.session_id;
1043 uint64_t key = msg.u.discarded_events.channel_key;
1044
e5742757
MD
1045 DBG("Kernel consumer discarded events command for session id %"
1046 PRIu64 ", channel key %" PRIu64, id, key);
1047
fb83fe64
JD
1048 channel = consumer_find_channel(key);
1049 if (!channel) {
1050 ERR("Kernel consumer discarded events channel %"
1051 PRIu64 " not found", key);
66ab32be 1052 count = 0;
e5742757 1053 } else {
66ab32be 1054 count = channel->discarded_events;
fb83fe64
JD
1055 }
1056
fb83fe64
JD
1057 health_code_update();
1058
1059 /* Send back returned value to session daemon */
66ab32be 1060 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1061 if (ret < 0) {
1062 PERROR("send discarded events");
1063 goto error_fatal;
1064 }
1065
1066 break;
1067 }
1068 case LTTNG_CONSUMER_LOST_PACKETS:
1069 {
66ab32be
JD
1070 ssize_t ret;
1071 uint64_t count;
fb83fe64
JD
1072 struct lttng_consumer_channel *channel;
1073 uint64_t id = msg.u.lost_packets.session_id;
1074 uint64_t key = msg.u.lost_packets.channel_key;
1075
e5742757
MD
1076 DBG("Kernel consumer lost packets command for session id %"
1077 PRIu64 ", channel key %" PRIu64, id, key);
1078
fb83fe64
JD
1079 channel = consumer_find_channel(key);
1080 if (!channel) {
1081 ERR("Kernel consumer lost packets channel %"
1082 PRIu64 " not found", key);
66ab32be 1083 count = 0;
e5742757 1084 } else {
66ab32be 1085 count = channel->lost_packets;
fb83fe64
JD
1086 }
1087
fb83fe64
JD
1088 health_code_update();
1089
1090 /* Send back returned value to session daemon */
66ab32be 1091 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1092 if (ret < 0) {
1093 PERROR("send lost packets");
1094 goto error_fatal;
1095 }
1096
1097 break;
1098 }
b3530820
JG
1099 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1100 {
1101 int channel_monitor_pipe;
0c5b3718
SM
1102 int ret_send_status, ret_set_channel_monitor_pipe;
1103 ssize_t ret_recv;
b3530820
JG
1104
1105 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1106 /* Successfully received the command's type. */
0c5b3718
SM
1107 ret_send_status = consumer_send_status_msg(sock, ret_code);
1108 if (ret_send_status < 0) {
b3530820
JG
1109 goto error_fatal;
1110 }
1111
0c5b3718
SM
1112 ret_recv = lttcomm_recv_fds_unix_sock(
1113 sock, &channel_monitor_pipe, 1);
1114 if (ret_recv != sizeof(channel_monitor_pipe)) {
b3530820
JG
1115 ERR("Failed to receive channel monitor pipe");
1116 goto error_fatal;
1117 }
1118
1119 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
0c5b3718
SM
1120 ret_set_channel_monitor_pipe =
1121 consumer_timer_thread_set_channel_monitor_pipe(
1122 channel_monitor_pipe);
1123 if (!ret_set_channel_monitor_pipe) {
b3530820 1124 int flags;
0c5b3718 1125 int ret_fcntl;
b3530820
JG
1126
1127 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1128 /* Set the pipe as non-blocking. */
0c5b3718
SM
1129 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1130 if (ret_fcntl == -1) {
b3530820
JG
1131 PERROR("fcntl get flags of the channel monitoring pipe");
1132 goto error_fatal;
1133 }
0c5b3718 1134 flags = ret_fcntl;
b3530820 1135
0c5b3718 1136 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
b3530820 1137 flags | O_NONBLOCK);
0c5b3718 1138 if (ret_fcntl == -1) {
b3530820
JG
1139 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1140 goto error_fatal;
1141 }
1142 DBG("Channel monitor pipe set as non-blocking");
1143 } else {
1144 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1145 }
0c5b3718
SM
1146 ret_send_status = consumer_send_status_msg(sock, ret_code);
1147 if (ret_send_status < 0) {
b3530820
JG
1148 goto error_fatal;
1149 }
1150 break;
1151 }
b99a8d42
JD
1152 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1153 {
92b7a7f8
MD
1154 struct lttng_consumer_channel *channel;
1155 uint64_t key = msg.u.rotate_channel.key;
0c5b3718 1156 int ret_send_status;
b99a8d42 1157
92b7a7f8 1158 DBG("Consumer rotate channel %" PRIu64, key);
b99a8d42 1159
92b7a7f8
MD
1160 channel = consumer_find_channel(key);
1161 if (!channel) {
1162 ERR("Channel %" PRIu64 " not found", key);
1163 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1164 } else {
1165 /*
1166 * Sample the rotate position of all the streams in this channel.
1167 */
0c5b3718
SM
1168 int ret_rotate_channel;
1169
1170 ret_rotate_channel = lttng_consumer_rotate_channel(
1171 channel, key,
f46376a1 1172 msg.u.rotate_channel.relayd_id);
0c5b3718 1173 if (ret_rotate_channel < 0) {
92b7a7f8
MD
1174 ERR("Rotate channel failed");
1175 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1176 }
b99a8d42 1177
92b7a7f8
MD
1178 health_code_update();
1179 }
0c5b3718
SM
1180
1181 ret_send_status = consumer_send_status_msg(sock, ret_code);
1182 if (ret_send_status < 0) {
b99a8d42 1183 /* Somehow, the session daemon is not responding anymore. */
713bdd26 1184 goto error_rotate_channel;
b99a8d42 1185 }
92b7a7f8
MD
1186 if (channel) {
1187 /* Rotate the streams that are ready right now. */
0c5b3718
SM
1188 int ret_rotate;
1189
1190 ret_rotate = lttng_consumer_rotate_ready_streams(
f46376a1 1191 channel, key);
0c5b3718 1192 if (ret_rotate < 0) {
92b7a7f8
MD
1193 ERR("Rotate ready streams failed");
1194 }
b99a8d42 1195 }
b99a8d42 1196 break;
713bdd26
JG
1197error_rotate_channel:
1198 goto end_nosignal;
b99a8d42 1199 }
5f3aff8b
MD
1200 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1201 {
1202 struct lttng_consumer_channel *channel;
1203 uint64_t key = msg.u.clear_channel.key;
0c5b3718 1204 int ret_send_status;
5f3aff8b
MD
1205
1206 channel = consumer_find_channel(key);
1207 if (!channel) {
1208 DBG("Channel %" PRIu64 " not found", key);
1209 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1210 } else {
0c5b3718
SM
1211 int ret_clear_channel;
1212
1213 ret_clear_channel =
1214 lttng_consumer_clear_channel(channel);
1215 if (ret_clear_channel) {
5f3aff8b 1216 ERR("Clear channel failed");
97535efa 1217 ret_code = (lttcomm_return_code) ret_clear_channel;
5f3aff8b
MD
1218 }
1219
1220 health_code_update();
1221 }
0c5b3718
SM
1222
1223 ret_send_status = consumer_send_status_msg(sock, ret_code);
1224 if (ret_send_status < 0) {
5f3aff8b
MD
1225 /* Somehow, the session daemon is not responding anymore. */
1226 goto end_nosignal;
1227 }
1228
1229 break;
1230 }
d2956687 1231 case LTTNG_CONSUMER_INIT:
00fb02ac 1232 {
0c5b3718 1233 int ret_send_status;
328c2fe7
JG
1234 lttng_uuid sessiond_uuid;
1235
1236 std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
1237 sessiond_uuid.begin());
0c5b3718 1238
d2956687 1239 ret_code = lttng_consumer_init_command(ctx,
328c2fe7 1240 sessiond_uuid);
00fb02ac 1241 health_code_update();
0c5b3718
SM
1242 ret_send_status = consumer_send_status_msg(sock, ret_code);
1243 if (ret_send_status < 0) {
00fb02ac
JD
1244 /* Somehow, the session daemon is not responding anymore. */
1245 goto end_nosignal;
1246 }
1247 break;
1248 }
d2956687 1249 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 1250 {
d2956687 1251 const struct lttng_credentials credentials = {
ff588497
JR
1252 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1253 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
d2956687
JG
1254 };
1255 const bool is_local_trace =
1256 !msg.u.create_trace_chunk.relayd_id.is_set;
1257 const uint64_t relayd_id =
1258 msg.u.create_trace_chunk.relayd_id.value;
1259 const char *chunk_override_name =
1260 *msg.u.create_trace_chunk.override_name ?
1261 msg.u.create_trace_chunk.override_name :
1262 NULL;
cbf53d23 1263 struct lttng_directory_handle *chunk_directory_handle = NULL;
d88744a4 1264
d2956687
JG
1265 /*
1266 * The session daemon will only provide a chunk directory file
1267 * descriptor for local traces.
1268 */
1269 if (is_local_trace) {
1270 int chunk_dirfd;
0c5b3718
SM
1271 int ret_send_status;
1272 ssize_t ret_recv;
19990ed5 1273
d2956687 1274 /* Acnowledge the reception of the command. */
0c5b3718
SM
1275 ret_send_status = consumer_send_status_msg(
1276 sock, LTTCOMM_CONSUMERD_SUCCESS);
1277 if (ret_send_status < 0) {
d2956687
JG
1278 /* Somehow, the session daemon is not responding anymore. */
1279 goto end_nosignal;
1280 }
92816cc3 1281
0c5b3718
SM
1282 ret_recv = lttcomm_recv_fds_unix_sock(
1283 sock, &chunk_dirfd, 1);
1284 if (ret_recv != sizeof(chunk_dirfd)) {
d2956687
JG
1285 ERR("Failed to receive trace chunk directory file descriptor");
1286 goto error_fatal;
1287 }
92816cc3 1288
d2956687
JG
1289 DBG("Received trace chunk directory fd (%d)",
1290 chunk_dirfd);
cbf53d23 1291 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
d2956687 1292 chunk_dirfd);
cbf53d23 1293 if (!chunk_directory_handle) {
d2956687
JG
1294 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1295 if (close(chunk_dirfd)) {
1296 PERROR("Failed to close chunk directory file descriptor");
1297 }
1298 goto error_fatal;
1299 }
92816cc3
JG
1300 }
1301
d2956687
JG
1302 ret_code = lttng_consumer_create_trace_chunk(
1303 !is_local_trace ? &relayd_id : NULL,
1304 msg.u.create_trace_chunk.session_id,
1305 msg.u.create_trace_chunk.chunk_id,
e5add6d0
JG
1306 (time_t) msg.u.create_trace_chunk
1307 .creation_timestamp,
d2956687 1308 chunk_override_name,
e5add6d0
JG
1309 msg.u.create_trace_chunk.credentials.is_set ?
1310 &credentials :
1311 NULL,
cbf53d23
JG
1312 chunk_directory_handle);
1313 lttng_directory_handle_put(chunk_directory_handle);
d2956687 1314 goto end_msg_sessiond;
d88744a4 1315 }
d2956687 1316 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 1317 {
bbc4768c 1318 enum lttng_trace_chunk_command_type close_command =
97535efa 1319 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
d2956687
JG
1320 const uint64_t relayd_id =
1321 msg.u.close_trace_chunk.relayd_id.value;
ecd1a12f
MD
1322 struct lttcomm_consumer_close_trace_chunk_reply reply;
1323 char path[LTTNG_PATH_MAX];
0c5b3718 1324 ssize_t ret_send;
d2956687
JG
1325
1326 ret_code = lttng_consumer_close_trace_chunk(
1327 msg.u.close_trace_chunk.relayd_id.is_set ?
bbc4768c
JG
1328 &relayd_id :
1329 NULL,
d2956687
JG
1330 msg.u.close_trace_chunk.session_id,
1331 msg.u.close_trace_chunk.chunk_id,
bbc4768c
JG
1332 (time_t) msg.u.close_trace_chunk.close_timestamp,
1333 msg.u.close_trace_chunk.close_command.is_set ?
1334 &close_command :
ecd1a12f
MD
1335 NULL, path);
1336 reply.ret_code = ret_code;
1337 reply.path_length = strlen(path) + 1;
0c5b3718
SM
1338 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1339 if (ret_send != sizeof(reply)) {
ecd1a12f
MD
1340 goto error_fatal;
1341 }
0c5b3718
SM
1342 ret_send = lttcomm_send_unix_sock(
1343 sock, path, reply.path_length);
1344 if (ret_send != reply.path_length) {
ecd1a12f
MD
1345 goto error_fatal;
1346 }
1347 goto end_nosignal;
3654ed19 1348 }
d2956687 1349 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
3654ed19 1350 {
d2956687
JG
1351 const uint64_t relayd_id =
1352 msg.u.trace_chunk_exists.relayd_id.value;
1353
1354 ret_code = lttng_consumer_trace_chunk_exists(
1355 msg.u.trace_chunk_exists.relayd_id.is_set ?
1356 &relayd_id : NULL,
1357 msg.u.trace_chunk_exists.session_id,
1358 msg.u.trace_chunk_exists.chunk_id);
1359 goto end_msg_sessiond;
a1ae2ea5 1360 }
04ed9e10
JG
1361 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1362 {
1363 const uint64_t key = msg.u.open_channel_packets.key;
1364 struct lttng_consumer_channel *channel =
1365 consumer_find_channel(key);
1366
1367 if (channel) {
1368 pthread_mutex_lock(&channel->lock);
1369 ret_code = lttng_consumer_open_channel_packets(channel);
1370 pthread_mutex_unlock(&channel->lock);
1371 } else {
1372 WARN("Channel %" PRIu64 " not found", key);
1373 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1374 }
1375
1376 health_code_update();
1377 goto end_msg_sessiond;
1378 }
3bd1e081 1379 default:
3f8e211f 1380 goto end_nosignal;
3bd1e081 1381 }
3f8e211f 1382
3bd1e081 1383end_nosignal:
4cbc1a04
DG
1384 /*
1385 * Return 1 to indicate success since the 0 value can be a socket
1386 * shutdown during the recv() or send() call.
1387 */
0c5b3718 1388 ret_func = 1;
c5c7998f
JG
1389 goto end;
1390error_fatal:
1391 /* This will issue a consumer stop. */
0c5b3718 1392 ret_func = -1;
c5c7998f 1393 goto end;
d2956687
JG
1394end_msg_sessiond:
1395 /*
1396 * The returned value here is not useful since either way we'll return 1 to
1397 * the caller because the session daemon socket management is done
1398 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1399 */
0c5b3718
SM
1400 {
1401 int ret_send_status;
1402
1403 ret_send_status = consumer_send_status_msg(sock, ret_code);
1404 if (ret_send_status < 0) {
1405 goto error_fatal;
1406 }
d2956687 1407 }
0c5b3718
SM
1408
1409 ret_func = 1;
1410
c5c7998f 1411end:
d2956687 1412 health_code_update();
1803a064 1413 rcu_read_unlock();
0c5b3718 1414 return ret_func;
3bd1e081 1415}
d41f73b7 1416
94d49140
JD
1417/*
1418 * Sync metadata meaning request them to the session daemon and snapshot to the
1419 * metadata thread can consumer them.
1420 *
1421 * Metadata stream lock MUST be acquired.
94d49140 1422 */
577eea73
JG
1423enum sync_metadata_status lttng_kconsumer_sync_metadata(
1424 struct lttng_consumer_stream *metadata)
94d49140
JD
1425{
1426 int ret;
577eea73 1427 enum sync_metadata_status status;
94d49140 1428
a0377dfe 1429 LTTNG_ASSERT(metadata);
94d49140
JD
1430
1431 ret = kernctl_buffer_flush(metadata->wait_fd);
1432 if (ret < 0) {
1433 ERR("Failed to flush kernel stream");
577eea73 1434 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
1435 goto end;
1436 }
1437
1438 ret = kernctl_snapshot(metadata->wait_fd);
1439 if (ret < 0) {
577eea73
JG
1440 if (errno == EAGAIN) {
1441 /* No new metadata, exit. */
1442 DBG("Sync metadata, no new kernel metadata");
1443 status = SYNC_METADATA_STATUS_NO_DATA;
1444 } else {
94d49140 1445 ERR("Sync metadata, taking kernel snapshot failed.");
577eea73 1446 status = SYNC_METADATA_STATUS_ERROR;
94d49140 1447 }
577eea73
JG
1448 } else {
1449 status = SYNC_METADATA_STATUS_NEW_DATA;
94d49140
JD
1450 }
1451
1452end:
577eea73 1453 return status;
94d49140 1454}
309167d2 1455
fb83fe64 1456static
6f9449c2
JG
1457int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1458 struct stream_subbuffer *subbuf)
fb83fe64
JD
1459{
1460 int ret;
fb83fe64 1461
6f9449c2
JG
1462 ret = kernctl_get_subbuf_size(
1463 stream->wait_fd, &subbuf->info.data.subbuf_size);
1464 if (ret) {
fb83fe64
JD
1465 goto end;
1466 }
fb83fe64 1467
6f9449c2
JG
1468 ret = kernctl_get_padded_subbuf_size(
1469 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1470 if (ret) {
fb83fe64
JD
1471 goto end;
1472 }
fb83fe64
JD
1473
1474end:
1475 return ret;
1476}
1477
93ec662e 1478static
6f9449c2
JG
1479int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1480 struct stream_subbuffer *subbuf)
93ec662e
JD
1481{
1482 int ret;
93ec662e 1483
6f9449c2
JG
1484 ret = extract_common_subbuffer_info(stream, subbuf);
1485 if (ret) {
93ec662e
JD
1486 goto end;
1487 }
1488
6f9449c2
JG
1489 ret = kernctl_get_metadata_version(
1490 stream->wait_fd, &subbuf->info.metadata.version);
1491 if (ret) {
93ec662e
JD
1492 goto end;
1493 }
1494
93ec662e
JD
1495end:
1496 return ret;
1497}
1498
6f9449c2
JG
1499static
1500int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1501 struct stream_subbuffer *subbuf)
d41f73b7 1502{
6f9449c2 1503 int ret;
d41f73b7 1504
6f9449c2
JG
1505 ret = extract_common_subbuffer_info(stream, subbuf);
1506 if (ret) {
1507 goto end;
1508 }
309167d2 1509
6f9449c2
JG
1510 ret = kernctl_get_packet_size(
1511 stream->wait_fd, &subbuf->info.data.packet_size);
1512 if (ret < 0) {
1513 PERROR("Failed to get sub-buffer packet size");
1514 goto end;
1515 }
02d02e31 1516
6f9449c2
JG
1517 ret = kernctl_get_content_size(
1518 stream->wait_fd, &subbuf->info.data.content_size);
1519 if (ret < 0) {
1520 PERROR("Failed to get sub-buffer content size");
1521 goto end;
d41f73b7
MD
1522 }
1523
6f9449c2
JG
1524 ret = kernctl_get_timestamp_begin(
1525 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1526 if (ret < 0) {
1527 PERROR("Failed to get sub-buffer begin timestamp");
1528 goto end;
1d4dfdef
DG
1529 }
1530
6f9449c2
JG
1531 ret = kernctl_get_timestamp_end(
1532 stream->wait_fd, &subbuf->info.data.timestamp_end);
1533 if (ret < 0) {
1534 PERROR("Failed to get sub-buffer end timestamp");
1535 goto end;
1536 }
1537
1538 ret = kernctl_get_events_discarded(
1539 stream->wait_fd, &subbuf->info.data.events_discarded);
1540 if (ret) {
1541 PERROR("Failed to get sub-buffer events discarded count");
1542 goto end;
1543 }
1544
1545 ret = kernctl_get_sequence_number(stream->wait_fd,
1546 &subbuf->info.data.sequence_number.value);
1547 if (ret) {
1548 /* May not be supported by older LTTng-modules. */
1549 if (ret != -ENOTTY) {
1550 PERROR("Failed to get sub-buffer sequence number");
1551 goto end;
fb83fe64 1552 }
1c20f0e2 1553 } else {
6f9449c2 1554 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
1555 }
1556
6f9449c2
JG
1557 ret = kernctl_get_stream_id(
1558 stream->wait_fd, &subbuf->info.data.stream_id);
1559 if (ret < 0) {
1560 PERROR("Failed to get stream id");
1561 goto end;
1562 }
1d4dfdef 1563
6f9449c2
JG
1564 ret = kernctl_get_instance_id(stream->wait_fd,
1565 &subbuf->info.data.stream_instance_id.value);
1566 if (ret) {
1567 /* May not be supported by older LTTng-modules. */
1568 if (ret != -ENOTTY) {
1569 PERROR("Failed to get stream instance id");
1570 goto end;
1d4dfdef 1571 }
6f9449c2
JG
1572 } else {
1573 subbuf->info.data.stream_instance_id.is_set = true;
1574 }
1575end:
1576 return ret;
1577}
47e81c02 1578
6f9449c2 1579static
b6797c8e
JG
1580enum get_next_subbuffer_status get_subbuffer_common(
1581 struct lttng_consumer_stream *stream,
6f9449c2
JG
1582 struct stream_subbuffer *subbuffer)
1583{
1584 int ret;
b6797c8e 1585 enum get_next_subbuffer_status status;
6f9449c2
JG
1586
1587 ret = kernctl_get_next_subbuf(stream->wait_fd);
b6797c8e
JG
1588 switch (ret) {
1589 case 0:
1590 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1591 break;
1592 case -ENODATA:
1593 case -EAGAIN:
6e5e3c51
MD
1594 /*
1595 * The caller only expects -ENODATA when there is no data to
1596 * read, but the kernel tracer returns -EAGAIN when there is
1597 * currently no data for a non-finalized stream, and -ENODATA
1598 * when there is no data for a finalized stream. Those can be
1599 * combined into a -ENODATA return value.
1600 */
b6797c8e
JG
1601 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1602 goto end;
1603 default:
1604 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2
JG
1605 goto end;
1606 }
1607
1608 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
b6797c8e
JG
1609 stream, subbuffer);
1610 if (ret) {
1611 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1612 }
6f9449c2 1613end:
b6797c8e 1614 return status;
6f9449c2 1615}
128708c3 1616
6f9449c2 1617static
b6797c8e
JG
1618enum get_next_subbuffer_status get_next_subbuffer_splice(
1619 struct lttng_consumer_stream *stream,
6f9449c2
JG
1620 struct stream_subbuffer *subbuffer)
1621{
b6797c8e
JG
1622 const enum get_next_subbuffer_status status =
1623 get_subbuffer_common(stream, subbuffer);
1d4dfdef 1624
b6797c8e 1625 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2
JG
1626 goto end;
1627 }
1d4dfdef 1628
6f9449c2
JG
1629 subbuffer->buffer.fd = stream->wait_fd;
1630end:
b6797c8e 1631 return status;
6f9449c2 1632}
fd424d99 1633
6f9449c2 1634static
b6797c8e
JG
1635enum get_next_subbuffer_status get_next_subbuffer_mmap(
1636 struct lttng_consumer_stream *stream,
6f9449c2
JG
1637 struct stream_subbuffer *subbuffer)
1638{
1639 int ret;
b6797c8e 1640 enum get_next_subbuffer_status status;
6f9449c2
JG
1641 const char *addr;
1642
b6797c8e
JG
1643 status = get_subbuffer_common(stream, subbuffer);
1644 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2 1645 goto end;
128708c3 1646 }
6f9449c2
JG
1647
1648 ret = get_current_subbuf_addr(stream, &addr);
1649 if (ret) {
b6797c8e 1650 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 1651 goto end;
d41f73b7 1652 }
6f9449c2
JG
1653
1654 subbuffer->buffer.buffer = lttng_buffer_view_init(
1655 addr, 0, subbuffer->info.data.padded_subbuf_size);
1656end:
b6797c8e 1657 return status;
6f9449c2
JG
1658}
1659
f5ba75b4 1660static
b6797c8e 1661enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
f5ba75b4
JG
1662 struct stream_subbuffer *subbuffer)
1663{
1664 int ret;
1665 const char *addr;
1666 bool coherent;
b6797c8e 1667 enum get_next_subbuffer_status status;
f5ba75b4
JG
1668
1669 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1670 &coherent);
1671 if (ret) {
1672 goto end;
1673 }
1674
1675 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1676 stream, subbuffer);
1677 if (ret) {
1678 goto end;
1679 }
1680
1681 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1682
1683 ret = get_current_subbuf_addr(stream, &addr);
1684 if (ret) {
1685 goto end;
1686 }
1687
1688 subbuffer->buffer.buffer = lttng_buffer_view_init(
1689 addr, 0, subbuffer->info.data.padded_subbuf_size);
1690 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1691 subbuffer->info.metadata.padded_subbuf_size,
1692 coherent ? "true" : "false");
1693end:
6e5e3c51
MD
1694 /*
1695 * The caller only expects -ENODATA when there is no data to read, but
1696 * the kernel tracer returns -EAGAIN when there is currently no data
1697 * for a non-finalized stream, and -ENODATA when there is no data for a
1698 * finalized stream. Those can be combined into a -ENODATA return value.
1699 */
b6797c8e
JG
1700 switch (ret) {
1701 case 0:
1702 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1703 break;
1704 case -ENODATA:
1705 case -EAGAIN:
1706 /*
1707 * The caller only expects -ENODATA when there is no data to
1708 * read, but the kernel tracer returns -EAGAIN when there is
1709 * currently no data for a non-finalized stream, and -ENODATA
1710 * when there is no data for a finalized stream. Those can be
1711 * combined into a -ENODATA return value.
1712 */
1713 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1714 break;
1715 default:
1716 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1717 break;
6e5e3c51
MD
1718 }
1719
b6797c8e 1720 return status;
f5ba75b4
JG
1721}
1722
6f9449c2
JG
1723static
1724int put_next_subbuffer(struct lttng_consumer_stream *stream,
f46376a1 1725 struct stream_subbuffer *subbuffer __attribute__((unused)))
6f9449c2
JG
1726{
1727 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1728
1729 if (ret) {
1730 if (ret == -EFAULT) {
1731 PERROR("Error in unreserving sub buffer");
1732 } else if (ret == -EIO) {
d41f73b7 1733 /* Should never happen with newer LTTng versions */
6f9449c2 1734 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
d41f73b7 1735 }
d41f73b7
MD
1736 }
1737
6f9449c2
JG
1738 return ret;
1739}
1c20f0e2 1740
f5ba75b4
JG
1741static
1742bool is_get_next_check_metadata_available(int tracer_fd)
1743{
741e787b
JG
1744 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1745 const bool available = ret != -ENOTTY;
1746
1747 if (ret == 0) {
1748 /* get succeeded, make sure to put the subbuffer. */
1749 kernctl_put_subbuf(tracer_fd);
1750 }
1751
1752 return available;
f5ba75b4
JG
1753}
1754
091441eb
MD
1755static
1756int signal_metadata(struct lttng_consumer_stream *stream,
f46376a1 1757 struct lttng_consumer_local_data *ctx __attribute__((unused)))
091441eb
MD
1758{
1759 ASSERT_LOCKED(stream->metadata_rdv_lock);
1760 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1761}
1762
f5ba75b4
JG
1763static
1764int lttng_kconsumer_set_stream_ops(
6f9449c2
JG
1765 struct lttng_consumer_stream *stream)
1766{
f5ba75b4
JG
1767 int ret = 0;
1768
1769 if (stream->metadata_flag && stream->chan->is_live) {
1770 DBG("Attempting to enable metadata bucketization for live consumers");
1771 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1772 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1773 stream->read_subbuffer_ops.get_next_subbuffer =
1774 get_next_subbuffer_metadata_check;
1775 ret = consumer_stream_enable_metadata_bucketization(
1776 stream);
1777 if (ret) {
1778 goto end;
1779 }
1780 } else {
1781 /*
1782 * The kernel tracer version is too old to indicate
1783 * when the metadata stream has reached a "coherent"
1784 * (parseable) point.
1785 *
1786 * This means that a live viewer may see an incoherent
1787 * sequence of metadata and fail to parse it.
1788 */
1789 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1790 metadata_bucket_destroy(stream->metadata_bucket);
1791 stream->metadata_bucket = NULL;
1792 }
091441eb
MD
1793
1794 stream->read_subbuffer_ops.on_sleep = signal_metadata;
f5ba75b4
JG
1795 }
1796
1797 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1798 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1799 stream->read_subbuffer_ops.get_next_subbuffer =
1800 get_next_subbuffer_mmap;
1801 } else {
1802 stream->read_subbuffer_ops.get_next_subbuffer =
1803 get_next_subbuffer_splice;
1804 }
94d49140
JD
1805 }
1806
6f9449c2
JG
1807 if (stream->metadata_flag) {
1808 stream->read_subbuffer_ops.extract_subbuffer_info =
1809 extract_metadata_subbuffer_info;
1810 } else {
1811 stream->read_subbuffer_ops.extract_subbuffer_info =
1812 extract_data_subbuffer_info;
1813 if (stream->chan->is_live) {
1814 stream->read_subbuffer_ops.send_live_beacon =
1815 consumer_flush_kernel_index;
1816 }
309167d2
JD
1817 }
1818
6f9449c2 1819 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
f5ba75b4
JG
1820end:
1821 return ret;
d41f73b7
MD
1822}
1823
1824int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1825{
1826 int ret;
ffe60014 1827
a0377dfe 1828 LTTNG_ASSERT(stream);
ffe60014 1829
2bba9e53 1830 /*
d2956687
JG
1831 * Don't create anything if this is set for streaming or if there is
1832 * no current trace chunk on the parent channel.
2bba9e53 1833 */
d2956687
JG
1834 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1835 stream->chan->trace_chunk) {
1836 ret = consumer_stream_create_output_files(stream, true);
1837 if (ret) {
fe4477ee
JD
1838 goto error;
1839 }
ffe60014 1840 }
d41f73b7 1841
d41f73b7
MD
1842 if (stream->output == LTTNG_EVENT_MMAP) {
1843 /* get the len of the mmap region */
1844 unsigned long mmap_len;
1845
1846 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1847 if (ret != 0) {
ffe60014 1848 PERROR("kernctl_get_mmap_len");
d41f73b7
MD
1849 goto error_close_fd;
1850 }
1851 stream->mmap_len = (size_t) mmap_len;
1852
ffe60014
DG
1853 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1854 MAP_PRIVATE, stream->wait_fd, 0);
d41f73b7 1855 if (stream->mmap_base == MAP_FAILED) {
ffe60014 1856 PERROR("Error mmaping");
d41f73b7
MD
1857 ret = -1;
1858 goto error_close_fd;
1859 }
1860 }
1861
f5ba75b4
JG
1862 ret = lttng_kconsumer_set_stream_ops(stream);
1863 if (ret) {
1864 goto error_close_fd;
1865 }
6f9449c2 1866
d41f73b7
MD
1867 /* we return 0 to let the library handle the FD internally */
1868 return 0;
1869
1870error_close_fd:
2f225ce2 1871 if (stream->out_fd >= 0) {
d41f73b7
MD
1872 int err;
1873
1874 err = close(stream->out_fd);
a0377dfe 1875 LTTNG_ASSERT(!err);
2f225ce2 1876 stream->out_fd = -1;
d41f73b7
MD
1877 }
1878error:
1879 return ret;
1880}
1881
ca22feea
DG
1882/*
1883 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1884 * stream. Consumer data lock MUST be acquired before calling this function
1885 * and the stream lock.
ca22feea 1886 *
6d805429 1887 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1888 * data is available for trace viewer reading.
1889 */
6d805429 1890int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1891{
1892 int ret;
1893
a0377dfe 1894 LTTNG_ASSERT(stream);
ca22feea 1895
873b9e9a
MD
1896 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1897 ret = 0;
1898 goto end;
1899 }
1900
ca22feea
DG
1901 ret = kernctl_get_next_subbuf(stream->wait_fd);
1902 if (ret == 0) {
1903 /* There is still data so let's put back this subbuffer. */
1904 ret = kernctl_put_subbuf(stream->wait_fd);
a0377dfe 1905 LTTNG_ASSERT(ret == 0);
6d805429 1906 ret = 1; /* Data is pending */
4e9a4686 1907 goto end;
ca22feea
DG
1908 }
1909
6d805429
DG
1910 /* Data is NOT pending and ready to be read. */
1911 ret = 0;
ca22feea 1912
6efae65e
DG
1913end:
1914 return ret;
ca22feea 1915}
This page took 0.278863 seconds and 5 git commands to generate.