Move LTTng-UST buffer ownership from application to consumer
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
f02e1e8a 21#include <lttng/ust-ctl.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
dbb5dfe6 28#include <sys/stat.h>
3bd1e081 29#include <sys/types.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31#include <unistd.h>
ffe60014 32#include <urcu/list.h>
0857097f 33
990570ed 34#include <common/common.h>
10a8a223 35#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 36#include <common/relayd/relayd.h>
dbb5dfe6 37#include <common/compat/fcntl.h>
10a8a223
DG
38
39#include "ust-consumer.h"
3bd1e081
MD
40
41extern struct lttng_consumer_global_data consumer_data;
42extern int consumer_poll_timeout;
43extern volatile int consumer_quit;
44
45/*
ffe60014
DG
46 * Free channel object and all streams associated with it. This MUST be used
47 * only and only if the channel has _NEVER_ been added to the global channel
48 * hash table.
3bd1e081 49 */
ffe60014 50static void destroy_channel(struct lttng_consumer_channel *channel)
3bd1e081 51{
ffe60014
DG
52 struct lttng_consumer_stream *stream, *stmp;
53
54 assert(channel);
55
56 DBG("UST consumer cleaning stream list");
57
58 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
59 send_node) {
60 cds_list_del(&stream->send_node);
61 ustctl_destroy_stream(stream->ustream);
62 free(stream);
63 }
64
65 /*
66 * If a channel is available meaning that was created before the streams
67 * were, delete it.
68 */
69 if (channel->uchan) {
70 lttng_ustconsumer_del_channel(channel);
71 }
72 free(channel);
73}
3bd1e081
MD
74
75/*
ffe60014 76 * Add channel to internal consumer state.
3bd1e081 77 *
ffe60014 78 * Returns 0 on success or else a negative value.
3bd1e081 79 */
ffe60014
DG
80static int add_channel(struct lttng_consumer_channel *channel,
81 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
82{
83 int ret = 0;
84
ffe60014
DG
85 assert(channel);
86 assert(ctx);
87
88 if (ctx->on_recv_channel != NULL) {
89 ret = ctx->on_recv_channel(channel);
90 if (ret == 0) {
91 ret = consumer_add_channel(channel);
92 } else if (ret < 0) {
93 /* Most likely an ENOMEM. */
94 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
95 goto error;
96 }
97 } else {
98 ret = consumer_add_channel(channel);
3bd1e081
MD
99 }
100
ffe60014
DG
101 DBG("UST consumer channel added (key: %u)", channel->key);
102
103error:
3bd1e081
MD
104 return ret;
105}
106
107/*
ffe60014
DG
108 * Allocate and return a consumer channel object.
109 */
110static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
111 const char *pathname, const char *name, uid_t uid, gid_t gid,
112 int relayd_id, unsigned long key, enum lttng_event_output output)
113{
114 assert(pathname);
115 assert(name);
116
117 return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
118 relayd_id, output);
119}
120
121/*
122 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
123 * error value if applicable is set in it else it is kept untouched.
3bd1e081 124 *
ffe60014 125 * Return NULL on error else the newly allocated stream object.
3bd1e081 126 */
ffe60014
DG
127static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
128 struct lttng_consumer_channel *channel,
129 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
130{
131 int alloc_ret;
132 struct lttng_consumer_stream *stream = NULL;
133
134 assert(channel);
135 assert(ctx);
136
137 stream = consumer_allocate_stream(channel->key,
138 key,
139 LTTNG_CONSUMER_ACTIVE_STREAM,
140 channel->name,
141 channel->uid,
142 channel->gid,
143 channel->relayd_id,
144 channel->session_id,
145 cpu,
146 &alloc_ret,
147 channel->type);
148 if (stream == NULL) {
149 switch (alloc_ret) {
150 case -ENOENT:
151 /*
152 * We could not find the channel. Can happen if cpu hotplug
153 * happens while tearing down.
154 */
155 DBG3("Could not find channel");
156 break;
157 case -ENOMEM:
158 case -EINVAL:
159 default:
160 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
161 break;
162 }
163 goto error;
164 }
165
166 stream->chan = channel;
167
168error:
169 if (_alloc_ret) {
170 *_alloc_ret = alloc_ret;
171 }
172 return stream;
173}
174
175/*
176 * Send the given stream pointer to the corresponding thread.
177 *
178 * Returns 0 on success else a negative value.
179 */
180static int send_stream_to_thread(struct lttng_consumer_stream *stream,
181 struct lttng_consumer_local_data *ctx)
182{
183 int ret, stream_pipe;
184
185 /* Get the right pipe where the stream will be sent. */
186 if (stream->metadata_flag) {
187 stream_pipe = ctx->consumer_metadata_pipe[1];
188 } else {
189 stream_pipe = ctx->consumer_data_pipe[1];
190 }
191
192 do {
193 ret = write(stream_pipe, &stream, sizeof(stream));
194 } while (ret < 0 && errno == EINTR);
195 if (ret < 0) {
196 PERROR("Consumer write %s stream to pipe %d",
197 stream->metadata_flag ? "metadata" : "data", stream_pipe);
198 }
199
200 return ret;
201}
202
203/*
204 * Search for a relayd object related to the stream. If found, send the stream
205 * to the relayd.
206 *
207 * On success, returns 0 else a negative value.
208 */
209static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
210{
211 int ret = 0;
212 struct consumer_relayd_sock_pair *relayd;
213
214 assert(stream);
215
216 relayd = consumer_find_relayd(stream->net_seq_idx);
217 if (relayd != NULL) {
218 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
219 /* Add stream on the relayd */
220 ret = relayd_add_stream(&relayd->control_sock, stream->name,
221 stream->chan->pathname, &stream->relayd_stream_id);
222 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
223 if (ret < 0) {
224 goto error;
225 }
226 } else if (stream->net_seq_idx != -1) {
227 ERR("Network sequence index %d unknown. Not adding stream.",
228 stream->net_seq_idx);
229 ret = -1;
230 goto error;
231 }
232
233error:
234 return ret;
235}
236
237static int create_ust_streams(struct lttng_consumer_channel *channel,
238 struct lttng_consumer_local_data *ctx)
239{
240 int ret, cpu = 0;
241 struct ustctl_consumer_stream *ustream;
242 struct lttng_consumer_stream *stream;
243
244 assert(channel);
245 assert(ctx);
246
247 /*
248 * While a stream is available from ustctl. When NULL is returned, we've
249 * reached the end of the possible stream for the channel.
250 */
251 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
252 int wait_fd;
253
254 wait_fd = ustctl_get_wait_fd(ustream);
255
256 /* Allocate consumer stream object. */
257 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
258 if (!stream) {
259 goto error_alloc;
260 }
261 stream->ustream = ustream;
262 /*
263 * Store it so we can save multiple function calls afterwards since
264 * this value is used heavily in the stream threads. This is UST
265 * specific so this is why it's done after allocation.
266 */
267 stream->wait_fd = wait_fd;
268
269 /*
270 * Order is important this is why a list is used. On error, the caller
271 * should clean this list.
272 */
273 cds_list_add_tail(&stream->send_node, &channel->streams.head);
274
275 ret = ustctl_get_max_subbuf_size(stream->ustream,
276 &stream->max_sb_size);
277 if (ret < 0) {
278 ERR("ustctl_get_max_subbuf_size failed for stream %s",
279 stream->name);
280 goto error;
281 }
282
283 /* Do actions once stream has been received. */
284 if (ctx->on_recv_stream) {
285 ret = ctx->on_recv_stream(stream);
286 if (ret < 0) {
287 goto error;
288 }
289 }
290
291 DBG("UST consumer add stream %s (key: %d) with relayd id %" PRIu64,
292 stream->name, stream->key, stream->relayd_stream_id);
293
294 /* Set next CPU stream. */
295 channel->streams.count = ++cpu;
296 }
297
298 return 0;
299
300error:
301error_alloc:
302 return ret;
303}
304
305/*
306 * Create an UST channel with the given attributes and send it to the session
307 * daemon using the ust ctl API.
308 *
309 * Return 0 on success or else a negative value.
310 */
311static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
312 struct ustctl_consumer_channel **chanp)
313{
314 int ret;
315 struct ustctl_consumer_channel *channel;
316
317 assert(attr);
318 assert(chanp);
319
320 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
321 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
322 "switch_timer_interval: %u, read_timer_interval: %u, "
323 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
324 attr->num_subbuf, attr->switch_timer_interval,
325 attr->read_timer_interval, attr->output, attr->type);
326
327 channel = ustctl_create_channel(attr);
328 if (!channel) {
329 ret = -1;
330 goto error_create;
331 }
332
333 *chanp = channel;
334
335 return 0;
336
337error_create:
338 return ret;
339}
340
341static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
342{
343 int ret;
344
345 assert(stream);
346 assert(sock >= 0);
347
348 DBG2("UST consumer sending stream %d to sessiond", stream->key);
349
350 /* Send stream to session daemon. */
351 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
352 if (ret < 0) {
353 goto error;
354 }
355
356 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
357 if (ret < 0) {
358 goto error;
359 }
360
361error:
362 return ret;
363}
364
365/*
366 * Send channel to sessiond.
367 *
368 * Return 0 on success or else a negative value. On error, the channel is
369 * destroy using ustctl.
370 */
371static int send_sessiond_channel(int sock,
372 struct lttng_consumer_channel *channel,
373 struct lttng_consumer_local_data *ctx, int *relayd_error)
374{
375 int ret;
376 struct lttng_consumer_stream *stream;
377
378 assert(channel);
379 assert(ctx);
380 assert(sock >= 0);
381
382 DBG("UST consumer sending channel %s to sessiond", channel->name);
383
384 /* Send channel to sessiond. */
385 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
386 if (ret < 0) {
387 goto error;
388 }
389
390 /* The channel was sent successfully to the sessiond at this point. */
391 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
392 /* Try to send the stream to the relayd if one is available. */
393 ret = send_stream_to_relayd(stream);
394 if (ret < 0) {
395 /*
396 * Flag that the relayd was the problem here probably due to a
397 * communicaton error on the socket.
398 */
399 if (relayd_error) {
400 *relayd_error = 1;
401 }
402 goto error;
403 }
404
405 /* Send stream to session daemon. */
406 ret = send_sessiond_stream(sock, stream);
407 if (ret < 0) {
408 goto error;
409 }
410 }
411
412 /* Tell sessiond there is no more stream. */
413 ret = ustctl_send_stream_to_sessiond(sock, NULL);
414 if (ret < 0) {
415 goto error;
416 }
417
418 DBG("UST consumer NULL stream sent to sessiond");
419
420 return 0;
421
422error:
423 return ret;
424}
425
426/*
427 * Creates a channel and streams and add the channel it to the channel internal
428 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
429 * received.
430 *
431 * Return 0 on success or else, a negative value is returned and the channel
432 * MUST be destroyed by consumer_del_channel().
433 */
434static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
435 struct lttng_consumer_channel *channel,
436 struct ustctl_consumer_channel_attr *attr)
3bd1e081
MD
437{
438 int ret;
439
ffe60014
DG
440 assert(ctx);
441 assert(channel);
442 assert(attr);
443
444 /*
445 * This value is still used by the kernel consumer since for the kernel,
446 * the stream ownership is not IN the consumer so we need to have the
447 * number of left stream that needs to be initialized so we can know when
448 * to delete the channel (see consumer.c).
449 *
450 * As for the user space tracer now, the consumer creates and sends the
451 * stream to the session daemon which only sends them to the application
452 * once every stream of a channel is received making this value useless
453 * because we they will be added to the poll thread before the application
454 * receives them. This ensures that a stream can not hang up during
455 * initilization of a channel.
456 */
457 channel->nb_init_stream_left = 0;
458
459 /* The reply msg status is handled in the following call. */
460 ret = create_ust_channel(attr, &channel->uchan);
461 if (ret < 0) {
462 goto error;
3bd1e081
MD
463 }
464
ffe60014
DG
465 /* Open all streams for this channel. */
466 ret = create_ust_streams(channel, ctx);
467 if (ret < 0) {
468 goto error;
469 }
470
471error:
3bd1e081
MD
472 return ret;
473}
474
4cbc1a04
DG
475/*
476 * Receive command from session daemon and process it.
477 *
478 * Return 1 on success else a negative value or 0.
479 */
3bd1e081
MD
480int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
481 int sock, struct pollfd *consumer_sockpoll)
482{
483 ssize_t ret;
f50f23d9 484 enum lttng_error_code ret_code = LTTNG_OK;
3bd1e081 485 struct lttcomm_consumer_msg msg;
ffe60014 486 struct lttng_consumer_channel *channel = NULL;
3bd1e081
MD
487
488 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
489 if (ret != sizeof(msg)) {
173af62f
DG
490 DBG("Consumer received unexpected message size %zd (expects %zu)",
491 ret, sizeof(msg));
ffe60014 492 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3be74084
DG
493 /*
494 * The ret value might 0 meaning an orderly shutdown but this is ok
495 * since the caller handles this.
496 */
3bd1e081
MD
497 return ret;
498 }
499 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
f50f23d9
DG
500 /*
501 * Notify the session daemon that the command is completed.
502 *
503 * On transport layer error, the function call will print an error
504 * message so handling the returned code is a bit useless since we
505 * return an error code anyway.
506 */
507 (void) consumer_send_status_msg(sock, ret_code);
3bd1e081
MD
508 return -ENOENT;
509 }
510
3f8e211f 511 /* relayd needs RCU read-side lock */
b0b335c8
MD
512 rcu_read_lock();
513
3bd1e081 514 switch (msg.cmd_type) {
00e2e675
DG
515 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
516 {
f50f23d9 517 /* Session daemon status message are handled in the following call. */
7735ef9e
DG
518 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
519 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
46e6455f 520 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
00e2e675
DG
521 goto end_nosignal;
522 }
173af62f
DG
523 case LTTNG_CONSUMER_DESTROY_RELAYD:
524 {
a6ba4fe1 525 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
526 struct consumer_relayd_sock_pair *relayd;
527
a6ba4fe1 528 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
529
530 /* Get relayd reference if exists. */
a6ba4fe1 531 relayd = consumer_find_relayd(index);
173af62f 532 if (relayd == NULL) {
3448e266 533 DBG("Unable to find relayd %" PRIu64, index);
f50f23d9 534 ret_code = LTTNG_ERR_NO_CONSUMER;
173af62f
DG
535 }
536
a6ba4fe1
DG
537 /*
538 * Each relayd socket pair has a refcount of stream attached to it
539 * which tells if the relayd is still active or not depending on the
540 * refcount value.
541 *
542 * This will set the destroy flag of the relayd object and destroy it
543 * if the refcount reaches zero when called.
544 *
545 * The destroy can happen either here or when a stream fd hangs up.
546 */
f50f23d9
DG
547 if (relayd) {
548 consumer_flag_relayd_for_destroy(relayd);
549 }
550
551 ret = consumer_send_status_msg(sock, ret_code);
552 if (ret < 0) {
553 /* Somehow, the session daemon is not responding anymore. */
554 goto end_nosignal;
555 }
173af62f 556
3f8e211f 557 goto end_nosignal;
173af62f 558 }
3bd1e081
MD
559 case LTTNG_CONSUMER_UPDATE_STREAM:
560 {
3f8e211f 561 rcu_read_unlock();
7ad0a0cb 562 return -ENOSYS;
3bd1e081 563 }
6d805429 564 case LTTNG_CONSUMER_DATA_PENDING:
53632229 565 {
3be74084 566 int ret, is_data_pending;
6d805429 567 uint64_t id = msg.u.data_pending.session_id;
ca22feea 568
6d805429 569 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 570
3be74084 571 is_data_pending = consumer_data_pending(id);
ca22feea
DG
572
573 /* Send back returned value to session daemon */
3be74084
DG
574 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
575 sizeof(is_data_pending));
ca22feea 576 if (ret < 0) {
3be74084 577 DBG("Error when sending the data pending ret code: %d", ret);
ca22feea 578 }
f50f23d9
DG
579
580 /*
581 * No need to send back a status message since the data pending
582 * returned value is the response.
583 */
ca22feea 584 break;
53632229 585 }
ffe60014
DG
586 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
587 {
588 int ret;
589 struct ustctl_consumer_channel_attr attr;
590
591 /* Create a plain object and reserve a channel key. */
592 channel = allocate_channel(msg.u.ask_channel.session_id,
593 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
594 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
595 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
596 (enum lttng_event_output) msg.u.ask_channel.output);
597 if (!channel) {
598 goto end_channel_error;
599 }
600
601 /* Build channel attributes from received message. */
602 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
603 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
604 attr.overwrite = msg.u.ask_channel.overwrite;
605 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
606 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
607 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
608
609 /* Translate the event output type to UST. */
610 switch (channel->output) {
611 case LTTNG_EVENT_SPLICE:
612 /* Splice not supported so fallback on mmap(). */
613 case LTTNG_EVENT_MMAP:
614 default:
615 attr.output = CONSUMER_CHANNEL_MMAP;
616 break;
617 };
618
619 /* Translate and save channel type. */
620 switch (msg.u.ask_channel.type) {
621 case LTTNG_UST_CHAN_PER_CPU:
622 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
623 attr.type = LTTNG_UST_CHAN_PER_CPU;
624 break;
625 case LTTNG_UST_CHAN_METADATA:
626 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
627 attr.type = LTTNG_UST_CHAN_METADATA;
628 break;
629 default:
630 assert(0);
631 goto error_fatal;
632 };
633
634 ret = ask_channel(ctx, sock, channel, &attr);
635 if (ret < 0) {
636 goto end_channel_error;
637 }
638
639 /*
640 * Add the channel to the internal state AFTER all streams were created
641 * and successfully sent to session daemon. This way, all streams must
642 * be ready before this channel is visible to the threads.
643 */
644 ret = add_channel(channel, ctx);
645 if (ret < 0) {
646 goto end_channel_error;
647 }
648
649 /*
650 * Channel and streams are now created. Inform the session daemon that
651 * everything went well and should wait to receive the channel and
652 * streams with ustctl API.
653 */
654 ret = consumer_send_status_channel(sock, channel);
655 if (ret < 0) {
656 /*
657 * There is probably a problem on the socket so the poll will get
658 * it and clean everything up.
659 */
660 goto end_nosignal;
661 }
662
663 break;
664 }
665 case LTTNG_CONSUMER_GET_CHANNEL:
666 {
667 int ret, relayd_err = 0;
668 unsigned long key = msg.u.get_channel.key;
669 struct lttng_consumer_channel *channel;
670 struct lttng_consumer_stream *stream, *stmp;
671
672 channel = consumer_find_channel(key);
673 if (!channel) {
674 ERR("UST consumer get channel key %lu not found", key);
675 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
676 goto end_msg_sessiond;
677 }
678
679 /* Inform sessiond that we are about to send channel and streams. */
680 ret = consumer_send_status_msg(sock, LTTNG_OK);
681 if (ret < 0) {
682 /* Somehow, the session daemon is not responding anymore. */
683 goto end_nosignal;
684 }
685
686 /* Send everything to sessiond. */
687 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
688 if (ret < 0) {
689 if (relayd_err) {
690 /*
691 * We were unable to send to the relayd the stream so avoid
692 * sending back a fatal error to the thread since this is OK
693 * and the consumer can continue its work.
694 */
695 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
696 goto end_msg_sessiond;
697 }
698 /*
699 * The communicaton was broken hence there is a bad state between
700 * the consumer and sessiond so stop everything.
701 */
702 goto error_fatal;
703 }
704
705 /* Send streams to the corresponding thread. */
706 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
707 send_node) {
708 /* Sending the stream to the thread. */
709 ret = send_stream_to_thread(stream, ctx);
710 if (ret < 0) {
711 /*
712 * If we are unable to send the stream to the thread, there is
713 * a big problem so just stop everything.
714 */
715 goto error_fatal;
716 }
717
718 /* Remove node from the channel stream list. */
719 cds_list_del(&stream->send_node);
720 }
721
722 /* List MUST be empty after or else it could be reused. */
723 assert(cds_list_empty(&channel->streams.head));
724
725 /* Inform sessiond that everything is done and OK on our side. */
726 ret = consumer_send_status_msg(sock, LTTNG_OK);
727 if (ret < 0) {
728 /* Somehow, the session daemon is not responding anymore. */
729 goto end_nosignal;
730 }
731
732 break;
733 }
734 case LTTNG_CONSUMER_DESTROY_CHANNEL:
735 {
736 int ret;
737 unsigned long key = msg.u.destroy_channel.key;
738 struct lttng_consumer_channel *channel;
739
740 DBG("UST consumer destroy channel key %lu", key);
741
742 channel = consumer_find_channel(key);
743 if (!channel) {
744 ERR("UST consumer destroy channel %lu not found", key);
745 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
746 } else {
747 /* Protocol error if the stream list is NOT empty. */
748 assert(!cds_list_empty(&channel->streams.head));
749 consumer_del_channel(channel);
750 }
751
752 ret = consumer_send_status_msg(sock, LTTNG_OK);
753 if (ret < 0) {
754 /* Somehow, the session daemon is not responding anymore. */
755 goto end_nosignal;
756 }
757 }
3bd1e081
MD
758 default:
759 break;
760 }
3f8e211f 761
3bd1e081 762end_nosignal:
b0b335c8 763 rcu_read_unlock();
4cbc1a04
DG
764
765 /*
766 * Return 1 to indicate success since the 0 value can be a socket
767 * shutdown during the recv() or send() call.
768 */
769 return 1;
ffe60014
DG
770
771end_msg_sessiond:
772 /*
773 * The returned value here is not useful since either way we'll return 1 to
774 * the caller because the session daemon socket management is done
775 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
776 */
777 (void) consumer_send_status_msg(sock, ret_code);
778 rcu_read_unlock();
779 return 1;
780end_channel_error:
781 if (channel) {
782 /*
783 * Free channel here since no one has a reference to it. We don't
784 * free after that because a stream can store this pointer.
785 */
786 destroy_channel(channel);
787 }
788 /* We have to send a status channel message indicating an error. */
789 ret = consumer_send_status_channel(sock, NULL);
790 if (ret < 0) {
791 /* Stop everything if session daemon can not be notified. */
792 goto error_fatal;
793 }
794 rcu_read_unlock();
795 return 1;
796error_fatal:
797 rcu_read_unlock();
798 /* This will issue a consumer stop. */
799 return -1;
3bd1e081
MD
800}
801
ffe60014
DG
802/*
803 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
804 * compiled out, we isolate it in this library.
805 */
806int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
807 unsigned long *off)
3bd1e081 808{
ffe60014
DG
809 assert(stream);
810 assert(stream->ustream);
b5c5fc29 811
ffe60014 812 return ustctl_get_mmap_read_offset(stream->ustream, off);
3bd1e081
MD
813}
814
ffe60014
DG
815/*
816 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
817 * compiled out, we isolate it in this library.
818 */
819void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
d056b477 820{
ffe60014
DG
821 assert(stream);
822 assert(stream->ustream);
823
824 return ustctl_get_mmap_base(stream->ustream);
d056b477
MD
825}
826
ffe60014
DG
827/*
828 * Take a snapshot for a specific fd
829 *
830 * Returns 0 on success, < 0 on error
831 */
832int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 833{
ffe60014
DG
834 assert(stream);
835 assert(stream->ustream);
836
837 return ustctl_snapshot(stream->ustream);
3bd1e081
MD
838}
839
ffe60014
DG
840/*
841 * Get the produced position
842 *
843 * Returns 0 on success, < 0 on error
844 */
845int lttng_ustconsumer_get_produced_snapshot(
846 struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 847{
ffe60014
DG
848 assert(stream);
849 assert(stream->ustream);
850 assert(pos);
7a57cf92 851
ffe60014
DG
852 return ustctl_snapshot_get_produced(stream->ustream, pos);
853}
7a57cf92 854
ffe60014
DG
855/*
856 * Called when the stream signal the consumer that it has hang up.
857 */
858void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
859{
860 assert(stream);
861 assert(stream->ustream);
2c1dd183 862
ffe60014
DG
863 ustctl_flush_buffer(stream->ustream, 0);
864 stream->hangup_flush_done = 1;
865}
ee77a7b0 866
ffe60014
DG
867void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
868{
869 assert(chan);
870 assert(chan->uchan);
e316aad5 871
ffe60014 872 ustctl_destroy_channel(chan->uchan);
3bd1e081
MD
873}
874
875void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
876{
ffe60014
DG
877 assert(stream);
878 assert(stream->ustream);
d41f73b7 879
ffe60014
DG
880 ustctl_destroy_stream(stream->ustream);
881}
d41f73b7
MD
882
883int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
884 struct lttng_consumer_local_data *ctx)
885{
1d4dfdef 886 unsigned long len, subbuf_size, padding;
d41f73b7
MD
887 int err;
888 long ret = 0;
d41f73b7 889 char dummy;
ffe60014
DG
890 struct ustctl_consumer_stream *ustream;
891
892 assert(stream);
893 assert(stream->ustream);
894 assert(ctx);
d41f73b7 895
ffe60014
DG
896 DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
897 stream->name);
898
899 /* Ease our life for what's next. */
900 ustream = stream->ustream;
d41f73b7
MD
901
902 /* We can consume the 1 byte written into the wait_fd by UST */
effcf122 903 if (!stream->hangup_flush_done) {
c617c0c6
MD
904 ssize_t readlen;
905
effcf122
MD
906 do {
907 readlen = read(stream->wait_fd, &dummy, 1);
87dc6a9c 908 } while (readlen == -1 && errno == EINTR);
effcf122
MD
909 if (readlen == -1) {
910 ret = readlen;
911 goto end;
912 }
d41f73b7
MD
913 }
914
d41f73b7 915 /* Get the next subbuffer */
ffe60014 916 err = ustctl_get_next_subbuf(ustream);
d41f73b7 917 if (err != 0) {
1d4dfdef 918 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
d41f73b7
MD
919 /*
920 * This is a debug message even for single-threaded consumer,
921 * because poll() have more relaxed criterions than get subbuf,
922 * so get_subbuf may fail for short race windows where poll()
923 * would issue wakeups.
924 */
925 DBG("Reserving sub buffer failed (everything is normal, "
ffe60014 926 "it is due to concurrency) [ret: %d]", err);
d41f73b7
MD
927 goto end;
928 }
ffe60014 929 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1d4dfdef 930 /* Get the full padded subbuffer size */
ffe60014 931 err = ustctl_get_padded_subbuf_size(ustream, &len);
effcf122 932 assert(err == 0);
1d4dfdef
DG
933
934 /* Get subbuffer data size (without padding) */
ffe60014 935 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
1d4dfdef
DG
936 assert(err == 0);
937
938 /* Make sure we don't get a subbuffer size bigger than the padded */
939 assert(len >= subbuf_size);
940
941 padding = len - subbuf_size;
d41f73b7 942 /* write the subbuffer to the tracefile */
1d4dfdef 943 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
91dfef6e
DG
944 /*
945 * The mmap operation should write subbuf_size amount of data when network
946 * streaming or the full padding (len) size when we are _not_ streaming.
947 */
948 if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
949 (ret != len && stream->net_seq_idx == -1)) {
d41f73b7 950 /*
91dfef6e 951 * Display the error but continue processing to try to release the
c5c45efa
DG
952 * subbuffer. This is a DBG statement since any unexpected kill or
953 * signal, the application gets unregistered, relayd gets closed or
954 * anything that affects the buffer lifetime will trigger this error.
955 * So, for the sake of the user, don't print this error since it can
956 * happen and it is OK with the code flow.
d41f73b7 957 */
c5c45efa 958 DBG("Error writing to tracefile "
91dfef6e
DG
959 "(ret: %zd != len: %lu != subbuf_size: %lu)",
960 ret, len, subbuf_size);
d41f73b7 961 }
ffe60014 962 err = ustctl_put_next_subbuf(ustream);
effcf122 963 assert(err == 0);
d41f73b7
MD
964end:
965 return ret;
966}
967
ffe60014
DG
968/*
969 * Called when a stream is created.
970 */
d41f73b7
MD
971int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
972{
973 int ret;
ffe60014 974 char full_path[PATH_MAX];
d41f73b7
MD
975
976 /* Opening the tracefile in write mode */
ffe60014
DG
977 if (stream->net_seq_idx != -1) {
978 goto end;
d41f73b7
MD
979 }
980
ffe60014
DG
981 ret = snprintf(full_path, sizeof(full_path), "%s/%s",
982 stream->chan->pathname, stream->name);
983 if (ret < 0) {
984 PERROR("snprintf on_recv_stream");
985 goto error;
986 }
987
988 ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
989 S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
990 if (ret < 0) {
991 PERROR("open stream path %s", full_path);
c869f647
DG
992 goto error;
993 }
ffe60014 994 stream->out_fd = ret;
c869f647 995
ffe60014 996end:
d41f73b7
MD
997 /* we return 0 to let the library handle the FD internally */
998 return 0;
999
1000error:
1001 return ret;
1002}
ca22feea
DG
1003
1004/*
1005 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1006 * stream. Consumer data lock MUST be acquired before calling this function
1007 * and the stream lock.
ca22feea 1008 *
6d805429 1009 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1010 * data is available for trace viewer reading.
1011 */
6d805429 1012int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1013{
1014 int ret;
1015
1016 assert(stream);
ffe60014 1017 assert(stream->ustream);
ca22feea 1018
6d805429 1019 DBG("UST consumer checking data pending");
c8f59ee5 1020
ffe60014 1021 ret = ustctl_get_next_subbuf(stream->ustream);
ca22feea
DG
1022 if (ret == 0) {
1023 /* There is still data so let's put back this subbuffer. */
ffe60014 1024 ret = ustctl_put_subbuf(stream->ustream);
ca22feea 1025 assert(ret == 0);
6d805429 1026 ret = 1; /* Data is pending */
4e9a4686 1027 goto end;
ca22feea
DG
1028 }
1029
6d805429
DG
1030 /* Data is NOT pending so ready to be read. */
1031 ret = 0;
ca22feea 1032
6efae65e
DG
1033end:
1034 return ret;
ca22feea 1035}
This page took 0.085542 seconds and 5 git commands to generate.