Network streaming support
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30
990570ed 31#include <common/common.h>
10a8a223 32#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 33#include <common/sessiond-comm/relayd.h>
10a8a223
DG
34#include <common/sessiond-comm/sessiond-comm.h>
35#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 36#include <common/relayd/relayd.h>
10a8a223
DG
37#include <common/ust-consumer/ust-consumer.h>
38
39#include "consumer.h"
3bd1e081
MD
40
41struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45};
46
47/* timeout parameter, to control the polling thread grace period. */
48int consumer_poll_timeout = -1;
49
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56volatile int consumer_quit = 0;
57
58/*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62static struct lttng_consumer_stream *consumer_find_stream(int key)
63{
e4421fec
DG
64 struct lttng_ht_iter iter;
65 struct lttng_ht_node_ulong *node;
66 struct lttng_consumer_stream *stream = NULL;
3bd1e081 67
7ad0a0cb
MD
68 /* Negative keys are lookup failures */
69 if (key < 0)
70 return NULL;
e4421fec 71
6065ceec
DG
72 rcu_read_lock();
73
e4421fec
DG
74 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
75 &iter);
76 node = lttng_ht_iter_get_node_ulong(&iter);
77 if (node != NULL) {
78 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 79 }
e4421fec 80
6065ceec
DG
81 rcu_read_unlock();
82
e4421fec 83 return stream;
3bd1e081
MD
84}
85
7ad0a0cb
MD
86static void consumer_steal_stream_key(int key)
87{
88 struct lttng_consumer_stream *stream;
89
04253271 90 rcu_read_lock();
7ad0a0cb 91 stream = consumer_find_stream(key);
04253271 92 if (stream) {
7ad0a0cb 93 stream->key = -1;
04253271
MD
94 /*
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
98 */
99 stream->node.key = -1;
100 }
101 rcu_read_unlock();
7ad0a0cb
MD
102}
103
3bd1e081
MD
104static struct lttng_consumer_channel *consumer_find_channel(int key)
105{
e4421fec
DG
106 struct lttng_ht_iter iter;
107 struct lttng_ht_node_ulong *node;
108 struct lttng_consumer_channel *channel = NULL;
3bd1e081 109
7ad0a0cb
MD
110 /* Negative keys are lookup failures */
111 if (key < 0)
112 return NULL;
e4421fec 113
6065ceec
DG
114 rcu_read_lock();
115
e4421fec
DG
116 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
117 &iter);
118 node = lttng_ht_iter_get_node_ulong(&iter);
119 if (node != NULL) {
120 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 121 }
e4421fec 122
6065ceec
DG
123 rcu_read_unlock();
124
e4421fec 125 return channel;
3bd1e081
MD
126}
127
7ad0a0cb
MD
128static void consumer_steal_channel_key(int key)
129{
130 struct lttng_consumer_channel *channel;
131
04253271 132 rcu_read_lock();
7ad0a0cb 133 channel = consumer_find_channel(key);
04253271 134 if (channel) {
7ad0a0cb 135 channel->key = -1;
04253271
MD
136 /*
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
140 */
141 channel->node.key = -1;
142 }
143 rcu_read_unlock();
7ad0a0cb
MD
144}
145
702b1ea4
MD
146static
147void consumer_free_stream(struct rcu_head *head)
148{
149 struct lttng_ht_node_ulong *node =
150 caa_container_of(head, struct lttng_ht_node_ulong, head);
151 struct lttng_consumer_stream *stream =
152 caa_container_of(node, struct lttng_consumer_stream, node);
153
154 free(stream);
155}
156
00e2e675
DG
157/*
158 * RCU protected relayd socket pair free.
159 */
160static void consumer_rcu_free_relayd(struct rcu_head *head)
161{
162 struct lttng_ht_node_ulong *node =
163 caa_container_of(head, struct lttng_ht_node_ulong, head);
164 struct consumer_relayd_sock_pair *relayd =
165 caa_container_of(node, struct consumer_relayd_sock_pair, node);
166
167 free(relayd);
168}
169
170/*
171 * Destroy and free relayd socket pair object.
172 *
173 * This function MUST be called with the consumer_data lock acquired.
174 */
175void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
176{
177 int ret;
178 struct lttng_ht_iter iter;
179
180 DBG("Consumer destroy and close relayd socket pair");
181
182 iter.iter.node = &relayd->node.node;
183 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
184 assert(!ret);
185
186 /* Close all sockets */
187 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
188 (void) relayd_close(&relayd->control_sock);
189 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
190 (void) relayd_close(&relayd->data_sock);
191
192 /* RCU free() call */
193 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
194}
195
3bd1e081
MD
196/*
197 * Remove a stream from the global list protected by a mutex. This
198 * function is also responsible for freeing its data structures.
199 */
200void consumer_del_stream(struct lttng_consumer_stream *stream)
201{
202 int ret;
e4421fec 203 struct lttng_ht_iter iter;
3bd1e081 204 struct lttng_consumer_channel *free_chan = NULL;
00e2e675
DG
205 struct consumer_relayd_sock_pair *relayd;
206
207 assert(stream);
3bd1e081
MD
208
209 pthread_mutex_lock(&consumer_data.lock);
210
211 switch (consumer_data.type) {
212 case LTTNG_CONSUMER_KERNEL:
213 if (stream->mmap_base != NULL) {
214 ret = munmap(stream->mmap_base, stream->mmap_len);
215 if (ret != 0) {
216 perror("munmap");
217 }
218 }
219 break;
7753dea8
MD
220 case LTTNG_CONSUMER32_UST:
221 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
222 lttng_ustconsumer_del_stream(stream);
223 break;
224 default:
225 ERR("Unknown consumer_data type");
226 assert(0);
227 goto end;
228 }
229
6065ceec 230 rcu_read_lock();
04253271
MD
231 iter.iter.node = &stream->node.node;
232 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
233 assert(!ret);
e4421fec 234
6065ceec
DG
235 rcu_read_unlock();
236
3bd1e081
MD
237 if (consumer_data.stream_count <= 0) {
238 goto end;
239 }
240 consumer_data.stream_count--;
241 if (!stream) {
242 goto end;
243 }
244 if (stream->out_fd >= 0) {
4c462e79
MD
245 ret = close(stream->out_fd);
246 if (ret) {
247 PERROR("close");
248 }
3bd1e081 249 }
b5c5fc29 250 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
4c462e79
MD
251 ret = close(stream->wait_fd);
252 if (ret) {
253 PERROR("close");
254 }
3bd1e081 255 }
2c1dd183 256 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
4c462e79
MD
257 ret = close(stream->shm_fd);
258 if (ret) {
259 PERROR("close");
260 }
3bd1e081 261 }
00e2e675
DG
262
263 /* Check and cleanup relayd */
264 relayd = consumer_find_relayd(stream->net_seq_idx);
265 if (relayd != NULL) {
266 /* We are about to modify the relayd refcount */
267 rcu_read_lock();
268 if (!--relayd->refcount) {
269 /* Refcount of the relayd struct is 0, destroy it */
270 consumer_destroy_relayd(relayd);
271 }
272 rcu_read_unlock();
273 }
274
275 if (!--stream->chan->refcount) {
3bd1e081 276 free_chan = stream->chan;
00e2e675
DG
277 }
278
702b1ea4
MD
279
280 call_rcu(&stream->node.head, consumer_free_stream);
3bd1e081
MD
281end:
282 consumer_data.need_update = 1;
283 pthread_mutex_unlock(&consumer_data.lock);
284
285 if (free_chan)
286 consumer_del_channel(free_chan);
287}
288
289struct lttng_consumer_stream *consumer_allocate_stream(
290 int channel_key, int stream_key,
291 int shm_fd, int wait_fd,
292 enum lttng_consumer_stream_state state,
293 uint64_t mmap_len,
294 enum lttng_event_output output,
6df2e2c9
MD
295 const char *path_name,
296 uid_t uid,
00e2e675
DG
297 gid_t gid,
298 int net_index,
299 int metadata_flag)
3bd1e081
MD
300{
301 struct lttng_consumer_stream *stream;
302 int ret;
303
effcf122 304 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
305 if (stream == NULL) {
306 perror("malloc struct lttng_consumer_stream");
307 goto end;
308 }
309 stream->chan = consumer_find_channel(channel_key);
310 if (!stream->chan) {
311 perror("Unable to find channel key");
312 goto end;
313 }
314 stream->chan->refcount++;
315 stream->key = stream_key;
316 stream->shm_fd = shm_fd;
317 stream->wait_fd = wait_fd;
318 stream->out_fd = -1;
319 stream->out_fd_offset = 0;
320 stream->state = state;
321 stream->mmap_len = mmap_len;
322 stream->mmap_base = NULL;
323 stream->output = output;
6df2e2c9
MD
324 stream->uid = uid;
325 stream->gid = gid;
00e2e675
DG
326 stream->net_seq_idx = net_index;
327 stream->metadata_flag = metadata_flag;
328 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
329 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
e4421fec 330 lttng_ht_node_init_ulong(&stream->node, stream->key);
00e2e675 331 lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
3bd1e081
MD
332
333 switch (consumer_data.type) {
334 case LTTNG_CONSUMER_KERNEL:
335 break;
7753dea8
MD
336 case LTTNG_CONSUMER32_UST:
337 case LTTNG_CONSUMER64_UST:
5af2f756 338 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
339 ret = lttng_ustconsumer_allocate_stream(stream);
340 if (ret) {
341 free(stream);
342 return NULL;
343 }
344 break;
345 default:
346 ERR("Unknown consumer_data type");
347 assert(0);
348 goto end;
349 }
00e2e675 350 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
3bd1e081
MD
351 stream->path_name, stream->key,
352 stream->shm_fd,
353 stream->wait_fd,
354 (unsigned long long) stream->mmap_len,
00e2e675
DG
355 stream->out_fd,
356 stream->net_seq_idx);
3bd1e081
MD
357end:
358 return stream;
359}
360
361/*
362 * Add a stream to the global list protected by a mutex.
363 */
364int consumer_add_stream(struct lttng_consumer_stream *stream)
365{
366 int ret = 0;
c77fc10a
DG
367 struct lttng_ht_node_ulong *node;
368 struct lttng_ht_iter iter;
00e2e675 369 struct consumer_relayd_sock_pair *relayd;
3bd1e081
MD
370
371 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
372 /* Steal stream identifier, for UST */
373 consumer_steal_stream_key(stream->key);
6065ceec 374 rcu_read_lock();
c77fc10a
DG
375
376 lttng_ht_lookup(consumer_data.stream_ht,
377 (void *)((unsigned long) stream->key), &iter);
378 node = lttng_ht_iter_get_node_ulong(&iter);
379 if (node != NULL) {
380 rcu_read_unlock();
381 /* Stream already exist. Ignore the insertion */
382 goto end;
383 }
384
04253271 385 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
6065ceec 386 rcu_read_unlock();
00e2e675
DG
387
388 /* Check and cleanup relayd */
389 relayd = consumer_find_relayd(stream->net_seq_idx);
390 if (relayd != NULL) {
391 /* We are about to modify the relayd refcount */
392 rcu_read_lock();
393 relayd->refcount++;
394 rcu_read_unlock();
395 }
396
397 /* Update consumer data */
3bd1e081
MD
398 consumer_data.stream_count++;
399 consumer_data.need_update = 1;
400
401 switch (consumer_data.type) {
402 case LTTNG_CONSUMER_KERNEL:
403 break;
7753dea8
MD
404 case LTTNG_CONSUMER32_UST:
405 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
406 /* Streams are in CPU number order (we rely on this) */
407 stream->cpu = stream->chan->nr_streams++;
408 break;
409 default:
410 ERR("Unknown consumer_data type");
411 assert(0);
412 goto end;
413 }
414
415end:
416 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 417
3bd1e081
MD
418 return ret;
419}
420
00e2e675
DG
421/*
422 * Add relayd socket to global consumer data hashtable.
423 */
424int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
425{
426 int ret = 0;
427 struct lttng_ht_node_ulong *node;
428 struct lttng_ht_iter iter;
429
430 if (relayd == NULL) {
431 ret = -1;
432 goto end;
433 }
434
435 rcu_read_lock();
436
437 lttng_ht_lookup(consumer_data.relayd_ht,
438 (void *)((unsigned long) relayd->net_seq_idx), &iter);
439 node = lttng_ht_iter_get_node_ulong(&iter);
440 if (node != NULL) {
441 rcu_read_unlock();
442 /* Relayd already exist. Ignore the insertion */
443 goto end;
444 }
445 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
446
447 rcu_read_unlock();
448
449end:
450 return ret;
451}
452
453/*
454 * Allocate and return a consumer relayd socket.
455 */
456struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
457 int net_seq_idx)
458{
459 struct consumer_relayd_sock_pair *obj = NULL;
460
461 /* Negative net sequence index is a failure */
462 if (net_seq_idx < 0) {
463 goto error;
464 }
465
466 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
467 if (obj == NULL) {
468 PERROR("zmalloc relayd sock");
469 goto error;
470 }
471
472 obj->net_seq_idx = net_seq_idx;
473 obj->refcount = 0;
474 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
475 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
476
477error:
478 return obj;
479}
480
481/*
482 * Find a relayd socket pair in the global consumer data.
483 *
484 * Return the object if found else NULL.
485 */
486struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
487{
488 struct lttng_ht_iter iter;
489 struct lttng_ht_node_ulong *node;
490 struct consumer_relayd_sock_pair *relayd = NULL;
491
492 /* Negative keys are lookup failures */
493 if (key < 0) {
494 goto error;
495 }
496
497 rcu_read_lock();
498
499 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
500 &iter);
501 node = lttng_ht_iter_get_node_ulong(&iter);
502 if (node != NULL) {
503 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
504 }
505
506 rcu_read_unlock();
507
508error:
509 return relayd;
510}
511
512/*
513 * Handle stream for relayd transmission if the stream applies for network
514 * streaming where the net sequence index is set.
515 *
516 * Return destination file descriptor or negative value on error.
517 */
518int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
519 size_t data_size)
520{
521 int outfd = -1, ret;
522 struct consumer_relayd_sock_pair *relayd;
523 struct lttcomm_relayd_data_hdr data_hdr;
524
525 /* Safety net */
526 assert(stream);
527
528 /* Reset data header */
529 memset(&data_hdr, 0, sizeof(data_hdr));
530
531 /* Get relayd reference of the stream. */
532 relayd = consumer_find_relayd(stream->net_seq_idx);
533 if (relayd == NULL) {
534 /* Stream is either local or corrupted */
535 goto error;
536 }
537
538 DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
539 if (stream->metadata_flag) {
540 /* Caller MUST acquire the relayd control socket lock */
541 ret = relayd_send_metadata(&relayd->control_sock, data_size);
542 if (ret < 0) {
543 goto error;
544 }
545
546 /* Metadata are always sent on the control socket. */
547 outfd = relayd->control_sock.fd;
548 } else {
549 /* Set header with stream information */
550 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
551 data_hdr.data_size = htobe32(data_size);
552 /* Other fields are zeroed previously */
553
554 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
555 sizeof(data_hdr));
556 if (ret < 0) {
557 goto error;
558 }
559
560 /* Set to go on data socket */
561 outfd = relayd->data_sock.fd;
562 }
563
564error:
565 return outfd;
566}
567
3bd1e081
MD
568/*
569 * Update a stream according to what we just received.
570 */
571void consumer_change_stream_state(int stream_key,
572 enum lttng_consumer_stream_state state)
573{
574 struct lttng_consumer_stream *stream;
575
576 pthread_mutex_lock(&consumer_data.lock);
577 stream = consumer_find_stream(stream_key);
578 if (stream) {
579 stream->state = state;
580 }
581 consumer_data.need_update = 1;
582 pthread_mutex_unlock(&consumer_data.lock);
583}
584
702b1ea4
MD
585static
586void consumer_free_channel(struct rcu_head *head)
587{
588 struct lttng_ht_node_ulong *node =
589 caa_container_of(head, struct lttng_ht_node_ulong, head);
590 struct lttng_consumer_channel *channel =
591 caa_container_of(node, struct lttng_consumer_channel, node);
592
593 free(channel);
594}
595
3bd1e081
MD
596/*
597 * Remove a channel from the global list protected by a mutex. This
598 * function is also responsible for freeing its data structures.
599 */
600void consumer_del_channel(struct lttng_consumer_channel *channel)
601{
602 int ret;
e4421fec 603 struct lttng_ht_iter iter;
3bd1e081
MD
604
605 pthread_mutex_lock(&consumer_data.lock);
606
607 switch (consumer_data.type) {
608 case LTTNG_CONSUMER_KERNEL:
609 break;
7753dea8
MD
610 case LTTNG_CONSUMER32_UST:
611 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
612 lttng_ustconsumer_del_channel(channel);
613 break;
614 default:
615 ERR("Unknown consumer_data type");
616 assert(0);
617 goto end;
618 }
619
6065ceec 620 rcu_read_lock();
04253271
MD
621 iter.iter.node = &channel->node.node;
622 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
623 assert(!ret);
6065ceec
DG
624 rcu_read_unlock();
625
3bd1e081
MD
626 if (channel->mmap_base != NULL) {
627 ret = munmap(channel->mmap_base, channel->mmap_len);
628 if (ret != 0) {
629 perror("munmap");
630 }
631 }
b5c5fc29 632 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
4c462e79
MD
633 ret = close(channel->wait_fd);
634 if (ret) {
635 PERROR("close");
636 }
3bd1e081 637 }
2c1dd183 638 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
4c462e79
MD
639 ret = close(channel->shm_fd);
640 if (ret) {
641 PERROR("close");
642 }
3bd1e081 643 }
702b1ea4
MD
644
645 call_rcu(&channel->node.head, consumer_free_channel);
3bd1e081
MD
646end:
647 pthread_mutex_unlock(&consumer_data.lock);
648}
649
650struct lttng_consumer_channel *consumer_allocate_channel(
651 int channel_key,
652 int shm_fd, int wait_fd,
653 uint64_t mmap_len,
654 uint64_t max_sb_size)
655{
656 struct lttng_consumer_channel *channel;
657 int ret;
658
276b26d1 659 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
660 if (channel == NULL) {
661 perror("malloc struct lttng_consumer_channel");
662 goto end;
663 }
664 channel->key = channel_key;
665 channel->shm_fd = shm_fd;
666 channel->wait_fd = wait_fd;
667 channel->mmap_len = mmap_len;
668 channel->max_sb_size = max_sb_size;
669 channel->refcount = 0;
670 channel->nr_streams = 0;
e4421fec 671 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
672
673 switch (consumer_data.type) {
674 case LTTNG_CONSUMER_KERNEL:
675 channel->mmap_base = NULL;
676 channel->mmap_len = 0;
677 break;
7753dea8
MD
678 case LTTNG_CONSUMER32_UST:
679 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
680 ret = lttng_ustconsumer_allocate_channel(channel);
681 if (ret) {
682 free(channel);
683 return NULL;
684 }
685 break;
686 default:
687 ERR("Unknown consumer_data type");
688 assert(0);
689 goto end;
690 }
691 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
00e2e675 692 channel->key, channel->shm_fd, channel->wait_fd,
3bd1e081
MD
693 (unsigned long long) channel->mmap_len,
694 (unsigned long long) channel->max_sb_size);
695end:
696 return channel;
697}
698
699/*
700 * Add a channel to the global list protected by a mutex.
701 */
702int consumer_add_channel(struct lttng_consumer_channel *channel)
703{
c77fc10a
DG
704 struct lttng_ht_node_ulong *node;
705 struct lttng_ht_iter iter;
706
3bd1e081 707 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
708 /* Steal channel identifier, for UST */
709 consumer_steal_channel_key(channel->key);
6065ceec 710 rcu_read_lock();
c77fc10a
DG
711
712 lttng_ht_lookup(consumer_data.channel_ht,
713 (void *)((unsigned long) channel->key), &iter);
714 node = lttng_ht_iter_get_node_ulong(&iter);
715 if (node != NULL) {
716 /* Channel already exist. Ignore the insertion */
717 goto end;
718 }
719
04253271 720 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
721
722end:
6065ceec 723 rcu_read_unlock();
3bd1e081 724 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 725
7ad0a0cb 726 return 0;
3bd1e081
MD
727}
728
729/*
730 * Allocate the pollfd structure and the local view of the out fds to avoid
731 * doing a lookup in the linked list and concurrency issues when writing is
732 * needed. Called with consumer_data.lock held.
733 *
734 * Returns the number of fds in the structures.
735 */
736int consumer_update_poll_array(
737 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
00e2e675
DG
738 struct lttng_consumer_stream **local_stream,
739 struct lttng_ht *metadata_ht)
3bd1e081 740{
3bd1e081 741 int i = 0;
e4421fec
DG
742 struct lttng_ht_iter iter;
743 struct lttng_consumer_stream *stream;
3bd1e081
MD
744
745 DBG("Updating poll fd array");
481d6c57 746 rcu_read_lock();
e4421fec
DG
747 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
748 node.node) {
749 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
3bd1e081
MD
750 continue;
751 }
e4421fec
DG
752 DBG("Active FD %d", stream->wait_fd);
753 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 754 (*pollfd)[i].events = POLLIN | POLLPRI;
00e2e675
DG
755 if (stream->metadata_flag && metadata_ht) {
756 lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node);
757 DBG("Active FD added to metadata hash table");
758 }
e4421fec 759 local_stream[i] = stream;
3bd1e081
MD
760 i++;
761 }
481d6c57 762 rcu_read_unlock();
3bd1e081
MD
763
764 /*
765 * Insert the consumer_poll_pipe at the end of the array and don't
766 * increment i so nb_fd is the number of real FD.
767 */
768 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
509bb1cf 769 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
770 return i;
771}
772
773/*
774 * Poll on the should_quit pipe and the command socket return -1 on error and
775 * should exit, 0 if data is available on the command socket
776 */
777int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
778{
779 int num_rdy;
780
88f2b785 781restart:
3bd1e081
MD
782 num_rdy = poll(consumer_sockpoll, 2, -1);
783 if (num_rdy == -1) {
88f2b785
MD
784 /*
785 * Restart interrupted system call.
786 */
787 if (errno == EINTR) {
788 goto restart;
789 }
3bd1e081
MD
790 perror("Poll error");
791 goto exit;
792 }
509bb1cf 793 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
794 DBG("consumer_should_quit wake up");
795 goto exit;
796 }
797 return 0;
798
799exit:
800 return -1;
801}
802
803/*
804 * Set the error socket.
805 */
806void lttng_consumer_set_error_sock(
807 struct lttng_consumer_local_data *ctx, int sock)
808{
809 ctx->consumer_error_socket = sock;
810}
811
812/*
813 * Set the command socket path.
814 */
3bd1e081
MD
815void lttng_consumer_set_command_sock_path(
816 struct lttng_consumer_local_data *ctx, char *sock)
817{
818 ctx->consumer_command_sock_path = sock;
819}
820
821/*
822 * Send return code to the session daemon.
823 * If the socket is not defined, we return 0, it is not a fatal error
824 */
825int lttng_consumer_send_error(
826 struct lttng_consumer_local_data *ctx, int cmd)
827{
828 if (ctx->consumer_error_socket > 0) {
829 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
830 sizeof(enum lttcomm_sessiond_command));
831 }
832
833 return 0;
834}
835
836/*
837 * Close all the tracefiles and stream fds, should be called when all instances
838 * are destroyed.
839 */
840void lttng_consumer_cleanup(void)
841{
e4421fec 842 struct lttng_ht_iter iter;
6065ceec
DG
843 struct lttng_ht_node_ulong *node;
844
845 rcu_read_lock();
3bd1e081
MD
846
847 /*
6065ceec
DG
848 * close all outfd. Called when there are no more threads running (after
849 * joining on the threads), no need to protect list iteration with mutex.
3bd1e081 850 */
6065ceec
DG
851 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
852 node) {
702b1ea4
MD
853 struct lttng_consumer_stream *stream =
854 caa_container_of(node, struct lttng_consumer_stream, node);
855 consumer_del_stream(stream);
3bd1e081 856 }
e4421fec 857
6065ceec
DG
858 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
859 node) {
702b1ea4
MD
860 struct lttng_consumer_channel *channel =
861 caa_container_of(node, struct lttng_consumer_channel, node);
862 consumer_del_channel(channel);
3bd1e081 863 }
6065ceec
DG
864
865 rcu_read_unlock();
d6ce1df2
MD
866
867 lttng_ht_destroy(consumer_data.stream_ht);
868 lttng_ht_destroy(consumer_data.channel_ht);
3bd1e081
MD
869}
870
871/*
872 * Called from signal handler.
873 */
874void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
875{
876 int ret;
877 consumer_quit = 1;
878 ret = write(ctx->consumer_should_quit[1], "4", 1);
879 if (ret < 0) {
880 perror("write consumer quit");
881 }
882}
883
00e2e675
DG
884void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
885 off_t orig_offset)
3bd1e081
MD
886{
887 int outfd = stream->out_fd;
888
889 /*
890 * This does a blocking write-and-wait on any page that belongs to the
891 * subbuffer prior to the one we just wrote.
892 * Don't care about error values, as these are just hints and ways to
893 * limit the amount of page cache used.
894 */
895 if (orig_offset < stream->chan->max_sb_size) {
896 return;
897 }
b9182dd9 898 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
3bd1e081
MD
899 stream->chan->max_sb_size,
900 SYNC_FILE_RANGE_WAIT_BEFORE
901 | SYNC_FILE_RANGE_WRITE
902 | SYNC_FILE_RANGE_WAIT_AFTER);
903 /*
904 * Give hints to the kernel about how we access the file:
905 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
906 * we write it.
907 *
908 * We need to call fadvise again after the file grows because the
909 * kernel does not seem to apply fadvise to non-existing parts of the
910 * file.
911 *
912 * Call fadvise _after_ having waited for the page writeback to
913 * complete because the dirty page writeback semantic is not well
914 * defined. So it can be expected to lead to lower throughput in
915 * streaming.
916 */
917 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
918 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
919}
920
921/*
922 * Initialise the necessary environnement :
923 * - create a new context
924 * - create the poll_pipe
925 * - create the should_quit pipe (for signal handler)
926 * - create the thread pipe (for splice)
927 *
928 * Takes a function pointer as argument, this function is called when data is
929 * available on a buffer. This function is responsible to do the
930 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
931 * buffer configuration and then kernctl_put_next_subbuf at the end.
932 *
933 * Returns a pointer to the new context or NULL on error.
934 */
935struct lttng_consumer_local_data *lttng_consumer_create(
936 enum lttng_consumer_type type,
4078b776 937 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 938 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
939 int (*recv_channel)(struct lttng_consumer_channel *channel),
940 int (*recv_stream)(struct lttng_consumer_stream *stream),
941 int (*update_stream)(int stream_key, uint32_t state))
942{
943 int ret, i;
944 struct lttng_consumer_local_data *ctx;
945
946 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
947 consumer_data.type == type);
948 consumer_data.type = type;
949
effcf122 950 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
951 if (ctx == NULL) {
952 perror("allocating context");
953 goto error;
954 }
955
956 ctx->consumer_error_socket = -1;
957 /* assign the callbacks */
958 ctx->on_buffer_ready = buffer_ready;
959 ctx->on_recv_channel = recv_channel;
960 ctx->on_recv_stream = recv_stream;
961 ctx->on_update_stream = update_stream;
962
963 ret = pipe(ctx->consumer_poll_pipe);
964 if (ret < 0) {
965 perror("Error creating poll pipe");
966 goto error_poll_pipe;
967 }
968
04fdd819
MD
969 /* set read end of the pipe to non-blocking */
970 ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
971 if (ret < 0) {
972 perror("fcntl O_NONBLOCK");
973 goto error_poll_fcntl;
974 }
975
976 /* set write end of the pipe to non-blocking */
977 ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
978 if (ret < 0) {
979 perror("fcntl O_NONBLOCK");
980 goto error_poll_fcntl;
981 }
982
3bd1e081
MD
983 ret = pipe(ctx->consumer_should_quit);
984 if (ret < 0) {
985 perror("Error creating recv pipe");
986 goto error_quit_pipe;
987 }
988
989 ret = pipe(ctx->consumer_thread_pipe);
990 if (ret < 0) {
991 perror("Error creating thread pipe");
992 goto error_thread_pipe;
993 }
994
995 return ctx;
996
997
998error_thread_pipe:
999 for (i = 0; i < 2; i++) {
1000 int err;
1001
1002 err = close(ctx->consumer_should_quit[i]);
4c462e79
MD
1003 if (err) {
1004 PERROR("close");
1005 }
3bd1e081 1006 }
04fdd819 1007error_poll_fcntl:
3bd1e081
MD
1008error_quit_pipe:
1009 for (i = 0; i < 2; i++) {
1010 int err;
1011
1012 err = close(ctx->consumer_poll_pipe[i]);
4c462e79
MD
1013 if (err) {
1014 PERROR("close");
1015 }
3bd1e081
MD
1016 }
1017error_poll_pipe:
1018 free(ctx);
1019error:
1020 return NULL;
1021}
1022
1023/*
1024 * Close all fds associated with the instance and free the context.
1025 */
1026void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1027{
4c462e79
MD
1028 int ret;
1029
1030 ret = close(ctx->consumer_error_socket);
1031 if (ret) {
1032 PERROR("close");
1033 }
1034 ret = close(ctx->consumer_thread_pipe[0]);
1035 if (ret) {
1036 PERROR("close");
1037 }
1038 ret = close(ctx->consumer_thread_pipe[1]);
1039 if (ret) {
1040 PERROR("close");
1041 }
1042 ret = close(ctx->consumer_poll_pipe[0]);
1043 if (ret) {
1044 PERROR("close");
1045 }
1046 ret = close(ctx->consumer_poll_pipe[1]);
1047 if (ret) {
1048 PERROR("close");
1049 }
1050 ret = close(ctx->consumer_should_quit[0]);
1051 if (ret) {
1052 PERROR("close");
1053 }
1054 ret = close(ctx->consumer_should_quit[1]);
1055 if (ret) {
1056 PERROR("close");
1057 }
3bd1e081
MD
1058 unlink(ctx->consumer_command_sock_path);
1059 free(ctx);
1060}
1061
1062/*
1063 * Mmap the ring buffer, read it and write the data to the tracefile.
1064 *
1065 * Returns the number of bytes written
1066 */
4078b776 1067ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081
MD
1068 struct lttng_consumer_local_data *ctx,
1069 struct lttng_consumer_stream *stream, unsigned long len)
1070{
1071 switch (consumer_data.type) {
1072 case LTTNG_CONSUMER_KERNEL:
1073 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
7753dea8
MD
1074 case LTTNG_CONSUMER32_UST:
1075 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1076 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
1077 default:
1078 ERR("Unknown consumer_data type");
1079 assert(0);
1080 }
b9182dd9
DG
1081
1082 return 0;
3bd1e081
MD
1083}
1084
1085/*
1086 * Splice the data from the ring buffer to the tracefile.
1087 *
1088 * Returns the number of bytes spliced.
1089 */
4078b776 1090ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081
MD
1091 struct lttng_consumer_local_data *ctx,
1092 struct lttng_consumer_stream *stream, unsigned long len)
1093{
1094 switch (consumer_data.type) {
1095 case LTTNG_CONSUMER_KERNEL:
1096 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
7753dea8
MD
1097 case LTTNG_CONSUMER32_UST:
1098 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1099 return -ENOSYS;
1100 default:
1101 ERR("Unknown consumer_data type");
1102 assert(0);
1103 return -ENOSYS;
1104 }
1105
1106}
1107
1108/*
1109 * Take a snapshot for a specific fd
1110 *
1111 * Returns 0 on success, < 0 on error
1112 */
1113int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1114 struct lttng_consumer_stream *stream)
1115{
1116 switch (consumer_data.type) {
1117 case LTTNG_CONSUMER_KERNEL:
1118 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
1119 case LTTNG_CONSUMER32_UST:
1120 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1121 return lttng_ustconsumer_take_snapshot(ctx, stream);
1122 default:
1123 ERR("Unknown consumer_data type");
1124 assert(0);
1125 return -ENOSYS;
1126 }
1127
1128}
1129
1130/*
1131 * Get the produced position
1132 *
1133 * Returns 0 on success, < 0 on error
1134 */
1135int lttng_consumer_get_produced_snapshot(
1136 struct lttng_consumer_local_data *ctx,
1137 struct lttng_consumer_stream *stream,
1138 unsigned long *pos)
1139{
1140 switch (consumer_data.type) {
1141 case LTTNG_CONSUMER_KERNEL:
1142 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
1143 case LTTNG_CONSUMER32_UST:
1144 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1145 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1146 default:
1147 ERR("Unknown consumer_data type");
1148 assert(0);
1149 return -ENOSYS;
1150 }
1151}
1152
1153int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1154 int sock, struct pollfd *consumer_sockpoll)
1155{
1156 switch (consumer_data.type) {
1157 case LTTNG_CONSUMER_KERNEL:
1158 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1159 case LTTNG_CONSUMER32_UST:
1160 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1161 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1162 default:
1163 ERR("Unknown consumer_data type");
1164 assert(0);
1165 return -ENOSYS;
1166 }
1167}
1168
1169/*
e4421fec 1170 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
1171 * it to tracefile if necessary.
1172 */
1173void *lttng_consumer_thread_poll_fds(void *data)
1174{
1175 int num_rdy, num_hup, high_prio, ret, i;
1176 struct pollfd *pollfd = NULL;
1177 /* local view of the streams */
1178 struct lttng_consumer_stream **local_stream = NULL;
1179 /* local view of consumer_data.fds_count */
1180 int nb_fd = 0;
3bd1e081 1181 struct lttng_consumer_local_data *ctx = data;
00e2e675
DG
1182 struct lttng_ht *metadata_ht;
1183 struct lttng_ht_iter iter;
1184 struct lttng_ht_node_ulong *node;
1185 struct lttng_consumer_stream *metadata_stream;
1186 ssize_t len;
1187
1188 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
3bd1e081 1189
e7b994a3
DG
1190 rcu_register_thread();
1191
effcf122 1192 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
1193
1194 while (1) {
1195 high_prio = 0;
1196 num_hup = 0;
1197
1198 /*
e4421fec 1199 * the fds set has been updated, we need to update our
3bd1e081
MD
1200 * local array as well
1201 */
1202 pthread_mutex_lock(&consumer_data.lock);
1203 if (consumer_data.need_update) {
1204 if (pollfd != NULL) {
1205 free(pollfd);
1206 pollfd = NULL;
1207 }
1208 if (local_stream != NULL) {
1209 free(local_stream);
1210 local_stream = NULL;
1211 }
1212
1213 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1214 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
1215 if (pollfd == NULL) {
1216 perror("pollfd malloc");
1217 pthread_mutex_unlock(&consumer_data.lock);
1218 goto end;
1219 }
1220
1221 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1222 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
1223 sizeof(struct lttng_consumer_stream));
1224 if (local_stream == NULL) {
1225 perror("local_stream malloc");
1226 pthread_mutex_unlock(&consumer_data.lock);
1227 goto end;
1228 }
00e2e675
DG
1229 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
1230 metadata_ht);
3bd1e081
MD
1231 if (ret < 0) {
1232 ERR("Error in allocating pollfd or local_outfds");
1233 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1234 pthread_mutex_unlock(&consumer_data.lock);
1235 goto end;
1236 }
1237 nb_fd = ret;
1238 consumer_data.need_update = 0;
1239 }
1240 pthread_mutex_unlock(&consumer_data.lock);
1241
4078b776
MD
1242 /* No FDs and consumer_quit, consumer_cleanup the thread */
1243 if (nb_fd == 0 && consumer_quit == 1) {
1244 goto end;
1245 }
3bd1e081 1246 /* poll on the array of fds */
88f2b785 1247 restart:
3bd1e081
MD
1248 DBG("polling on %d fd", nb_fd + 1);
1249 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
1250 DBG("poll num_rdy : %d", num_rdy);
1251 if (num_rdy == -1) {
88f2b785
MD
1252 /*
1253 * Restart interrupted system call.
1254 */
1255 if (errno == EINTR) {
1256 goto restart;
1257 }
3bd1e081
MD
1258 perror("Poll error");
1259 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1260 goto end;
1261 } else if (num_rdy == 0) {
1262 DBG("Polling thread timed out");
1263 goto end;
1264 }
1265
3bd1e081 1266 /*
00e2e675
DG
1267 * If the consumer_poll_pipe triggered poll go directly to the
1268 * beginning of the loop to update the array. We want to prioritize
1269 * array update over low-priority reads.
3bd1e081 1270 */
509bb1cf 1271 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
04fdd819
MD
1272 size_t pipe_readlen;
1273 char tmp;
1274
3bd1e081 1275 DBG("consumer_poll_pipe wake up");
04fdd819
MD
1276 /* Consume 1 byte of pipe data */
1277 do {
1278 pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1279 } while (pipe_readlen == -1 && errno == EINTR);
3bd1e081
MD
1280 continue;
1281 }
1282
1283 /* Take care of high priority channels first. */
1284 for (i = 0; i < nb_fd; i++) {
00e2e675
DG
1285 /* Lookup for metadata which is the highest priority */
1286 lttng_ht_lookup(metadata_ht,
1287 (void *)((unsigned long) pollfd[i].fd), &iter);
1288 node = lttng_ht_iter_get_node_ulong(&iter);
1289 if (node != NULL &&
1290 (pollfd[i].revents & (POLLIN | POLLPRI))) {
1291 DBG("Urgent metadata read on fd %d", pollfd[i].fd);
1292 metadata_stream = caa_container_of(node,
1293 struct lttng_consumer_stream, waitfd_node);
1294 high_prio = 1;
1295 len = ctx->on_buffer_ready(metadata_stream, ctx);
1296 /* it's ok to have an unavailable sub-buffer */
1297 if (len < 0 && len != -EAGAIN) {
1298 goto end;
1299 } else if (len > 0) {
1300 metadata_stream->data_read = 1;
1301 }
1302 } else if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
1303 DBG("Urgent read on fd %d", pollfd[i].fd);
1304 high_prio = 1;
4078b776 1305 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 1306 /* it's ok to have an unavailable sub-buffer */
4078b776
MD
1307 if (len < 0 && len != -EAGAIN) {
1308 goto end;
1309 } else if (len > 0) {
1310 local_stream[i]->data_read = 1;
d41f73b7 1311 }
3bd1e081
MD
1312 }
1313 }
1314
4078b776
MD
1315 /*
1316 * If we read high prio channel in this loop, try again
1317 * for more high prio data.
1318 */
1319 if (high_prio) {
3bd1e081
MD
1320 continue;
1321 }
1322
1323 /* Take care of low priority channels. */
4078b776
MD
1324 for (i = 0; i < nb_fd; i++) {
1325 if ((pollfd[i].revents & POLLIN) ||
1326 local_stream[i]->hangup_flush_done) {
4078b776
MD
1327 DBG("Normal read on fd %d", pollfd[i].fd);
1328 len = ctx->on_buffer_ready(local_stream[i], ctx);
1329 /* it's ok to have an unavailable sub-buffer */
1330 if (len < 0 && len != -EAGAIN) {
1331 goto end;
1332 } else if (len > 0) {
1333 local_stream[i]->data_read = 1;
1334 }
1335 }
1336 }
1337
1338 /* Handle hangup and errors */
1339 for (i = 0; i < nb_fd; i++) {
1340 if (!local_stream[i]->hangup_flush_done
1341 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1342 && (consumer_data.type == LTTNG_CONSUMER32_UST
1343 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1344 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1345 pollfd[i].fd);
1346 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1347 /* Attempt read again, for the data we just flushed. */
1348 local_stream[i]->data_read = 1;
1349 }
1350 /*
1351 * If the poll flag is HUP/ERR/NVAL and we have
1352 * read no data in this pass, we can remove the
1353 * stream from its hash table.
1354 */
1355 if ((pollfd[i].revents & POLLHUP)) {
1356 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1357 if (!local_stream[i]->data_read) {
00e2e675
DG
1358 if (local_stream[i]->metadata_flag) {
1359 iter.iter.node = &local_stream[i]->waitfd_node.node;
1360 ret = lttng_ht_del(metadata_ht, &iter);
1361 assert(!ret);
1362 }
702b1ea4 1363 consumer_del_stream(local_stream[i]);
4078b776
MD
1364 num_hup++;
1365 }
1366 } else if (pollfd[i].revents & POLLERR) {
1367 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1368 if (!local_stream[i]->data_read) {
00e2e675
DG
1369 if (local_stream[i]->metadata_flag) {
1370 iter.iter.node = &local_stream[i]->waitfd_node.node;
1371 ret = lttng_ht_del(metadata_ht, &iter);
1372 assert(!ret);
1373 }
702b1ea4 1374 consumer_del_stream(local_stream[i]);
4078b776
MD
1375 num_hup++;
1376 }
1377 } else if (pollfd[i].revents & POLLNVAL) {
1378 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1379 if (!local_stream[i]->data_read) {
00e2e675
DG
1380 if (local_stream[i]->metadata_flag) {
1381 iter.iter.node = &local_stream[i]->waitfd_node.node;
1382 ret = lttng_ht_del(metadata_ht, &iter);
1383 assert(!ret);
1384 }
702b1ea4 1385 consumer_del_stream(local_stream[i]);
4078b776 1386 num_hup++;
3bd1e081
MD
1387 }
1388 }
4078b776 1389 local_stream[i]->data_read = 0;
3bd1e081
MD
1390 }
1391 }
1392end:
1393 DBG("polling thread exiting");
1394 if (pollfd != NULL) {
1395 free(pollfd);
1396 pollfd = NULL;
1397 }
1398 if (local_stream != NULL) {
1399 free(local_stream);
1400 local_stream = NULL;
1401 }
e7b994a3 1402 rcu_unregister_thread();
3bd1e081
MD
1403 return NULL;
1404}
1405
1406/*
1407 * This thread listens on the consumerd socket and receives the file
1408 * descriptors from the session daemon.
1409 */
1410void *lttng_consumer_thread_receive_fds(void *data)
1411{
1412 int sock, client_socket, ret;
1413 /*
1414 * structure to poll for incoming data on communication socket avoids
1415 * making blocking sockets.
1416 */
1417 struct pollfd consumer_sockpoll[2];
1418 struct lttng_consumer_local_data *ctx = data;
1419
e7b994a3
DG
1420 rcu_register_thread();
1421
3bd1e081
MD
1422 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
1423 unlink(ctx->consumer_command_sock_path);
1424 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
1425 if (client_socket < 0) {
1426 ERR("Cannot create command socket");
1427 goto end;
1428 }
1429
1430 ret = lttcomm_listen_unix_sock(client_socket);
1431 if (ret < 0) {
1432 goto end;
1433 }
1434
32258573 1435 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
1436 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
1437 /* return < 0 on error, but == 0 is not fatal */
1438 if (ret < 0) {
32258573 1439 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
1440 goto end;
1441 }
1442
1443 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
1444 if (ret < 0) {
1445 perror("fcntl O_NONBLOCK");
1446 goto end;
1447 }
1448
1449 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1450 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1451 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1452 consumer_sockpoll[1].fd = client_socket;
1453 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1454
1455 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1456 goto end;
1457 }
1458 DBG("Connection on client_socket");
1459
1460 /* Blocking call, waiting for transmission */
1461 sock = lttcomm_accept_unix_sock(client_socket);
1462 if (sock <= 0) {
1463 WARN("On accept");
1464 goto end;
1465 }
1466 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1467 if (ret < 0) {
1468 perror("fcntl O_NONBLOCK");
1469 goto end;
1470 }
1471
1472 /* update the polling structure to poll on the established socket */
1473 consumer_sockpoll[1].fd = sock;
1474 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1475
1476 while (1) {
1477 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1478 goto end;
1479 }
1480 DBG("Incoming command on sock");
1481 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1482 if (ret == -ENOENT) {
1483 DBG("Received STOP command");
1484 goto end;
1485 }
1486 if (ret < 0) {
1487 ERR("Communication interrupted on command socket");
1488 goto end;
1489 }
1490 if (consumer_quit) {
1491 DBG("consumer_thread_receive_fds received quit from signal");
1492 goto end;
1493 }
1494 DBG("received fds on sock");
1495 }
1496end:
1497 DBG("consumer_thread_receive_fds exiting");
1498
1499 /*
1500 * when all fds have hung up, the polling thread
1501 * can exit cleanly
1502 */
1503 consumer_quit = 1;
1504
1505 /*
1506 * 2s of grace period, if no polling events occur during
1507 * this period, the polling thread will exit even if there
1508 * are still open FDs (should not happen, but safety mechanism).
1509 */
1510 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1511
04fdd819
MD
1512 /*
1513 * Wake-up the other end by writing a null byte in the pipe
1514 * (non-blocking). Important note: Because writing into the
1515 * pipe is non-blocking (and therefore we allow dropping wakeup
1516 * data, as long as there is wakeup data present in the pipe
1517 * buffer to wake up the other end), the other end should
1518 * perform the following sequence for waiting:
1519 * 1) empty the pipe (reads).
1520 * 2) perform update operation.
1521 * 3) wait on the pipe (poll).
1522 */
1523 do {
1524 ret = write(ctx->consumer_poll_pipe[1], "", 1);
1525 } while (ret == -1UL && errno == EINTR);
e7b994a3 1526 rcu_unregister_thread();
3bd1e081
MD
1527 return NULL;
1528}
d41f73b7 1529
4078b776 1530ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
1531 struct lttng_consumer_local_data *ctx)
1532{
1533 switch (consumer_data.type) {
1534 case LTTNG_CONSUMER_KERNEL:
1535 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
1536 case LTTNG_CONSUMER32_UST:
1537 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1538 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1539 default:
1540 ERR("Unknown consumer_data type");
1541 assert(0);
1542 return -ENOSYS;
1543 }
1544}
1545
1546int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1547{
1548 switch (consumer_data.type) {
1549 case LTTNG_CONSUMER_KERNEL:
1550 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
1551 case LTTNG_CONSUMER32_UST:
1552 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1553 return lttng_ustconsumer_on_recv_stream(stream);
1554 default:
1555 ERR("Unknown consumer_data type");
1556 assert(0);
1557 return -ENOSYS;
1558 }
1559}
e4421fec
DG
1560
1561/*
1562 * Allocate and set consumer data hash tables.
1563 */
1564void lttng_consumer_init(void)
1565{
1566 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1567 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675 1568 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
e4421fec 1569}
This page took 0.101692 seconds and 5 git commands to generate.