Implement snapshot commands in lttng-sessiond
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
77c7c900 30#include <inttypes.h>
331744e3 31#include <signal.h>
3bd1e081 32
990570ed 33#include <common/common.h>
fb3a43a9
DG
34#include <common/utils.h>
35#include <common/compat/poll.h>
10a8a223 36#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 37#include <common/sessiond-comm/relayd.h>
10a8a223
DG
38#include <common/sessiond-comm/sessiond-comm.h>
39#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 40#include <common/relayd/relayd.h>
10a8a223
DG
41#include <common/ust-consumer/ust-consumer.h>
42
43#include "consumer.h"
1d1a276c 44#include "consumer-stream.h"
3bd1e081
MD
45
46struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
47 .stream_count = 0,
48 .need_update = 1,
49 .type = LTTNG_CONSUMER_UNKNOWN,
50};
51
d8ef542d
MD
52enum consumer_channel_action {
53 CONSUMER_CHANNEL_ADD,
a0cbdd2e 54 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
55 CONSUMER_CHANNEL_QUIT,
56};
57
58struct consumer_channel_msg {
59 enum consumer_channel_action action;
a0cbdd2e
MD
60 struct lttng_consumer_channel *chan; /* add */
61 uint64_t key; /* del */
d8ef542d
MD
62};
63
3bd1e081
MD
64/*
65 * Flag to inform the polling thread to quit when all fd hung up. Updated by
66 * the consumer_thread_receive_fds when it notices that all fds has hung up.
67 * Also updated by the signal handler (consumer_should_exit()). Read by the
68 * polling threads.
69 */
a98dae5f 70volatile int consumer_quit;
3bd1e081 71
43c34bc3 72/*
43c34bc3
DG
73 * Global hash table containing respectively metadata and data streams. The
74 * stream element in this ht should only be updated by the metadata poll thread
75 * for the metadata and the data poll thread for the data.
76 */
40dc48e0
DG
77static struct lttng_ht *metadata_ht;
78static struct lttng_ht *data_ht;
43c34bc3 79
acdb9057
DG
80/*
81 * Notify a thread lttng pipe to poll back again. This usually means that some
82 * global state has changed so we just send back the thread in a poll wait
83 * call.
84 */
85static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
86{
87 struct lttng_consumer_stream *null_stream = NULL;
88
89 assert(pipe);
90
91 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
92}
93
d8ef542d
MD
94static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
95 struct lttng_consumer_channel *chan,
a0cbdd2e 96 uint64_t key,
d8ef542d
MD
97 enum consumer_channel_action action)
98{
99 struct consumer_channel_msg msg;
100 int ret;
101
e56251fc
DG
102 memset(&msg, 0, sizeof(msg));
103
d8ef542d
MD
104 msg.action = action;
105 msg.chan = chan;
f21dae48 106 msg.key = key;
d8ef542d
MD
107 do {
108 ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
109 } while (ret < 0 && errno == EINTR);
110}
111
a0cbdd2e
MD
112void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
113 uint64_t key)
114{
115 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
116}
117
d8ef542d
MD
118static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
119 struct lttng_consumer_channel **chan,
a0cbdd2e 120 uint64_t *key,
d8ef542d
MD
121 enum consumer_channel_action *action)
122{
123 struct consumer_channel_msg msg;
124 int ret;
125
126 do {
127 ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
128 } while (ret < 0 && errno == EINTR);
129 if (ret > 0) {
130 *action = msg.action;
131 *chan = msg.chan;
a0cbdd2e 132 *key = msg.key;
d8ef542d
MD
133 }
134 return ret;
135}
136
3bd1e081
MD
137/*
138 * Find a stream. The consumer_data.lock must be locked during this
139 * call.
140 */
d88aee68 141static struct lttng_consumer_stream *find_stream(uint64_t key,
8389e4f8 142 struct lttng_ht *ht)
3bd1e081 143{
e4421fec 144 struct lttng_ht_iter iter;
d88aee68 145 struct lttng_ht_node_u64 *node;
e4421fec 146 struct lttng_consumer_stream *stream = NULL;
3bd1e081 147
8389e4f8
DG
148 assert(ht);
149
d88aee68
DG
150 /* -1ULL keys are lookup failures */
151 if (key == (uint64_t) -1ULL) {
7ad0a0cb 152 return NULL;
7a57cf92 153 }
e4421fec 154
6065ceec
DG
155 rcu_read_lock();
156
d88aee68
DG
157 lttng_ht_lookup(ht, &key, &iter);
158 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
159 if (node != NULL) {
160 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 161 }
e4421fec 162
6065ceec
DG
163 rcu_read_unlock();
164
e4421fec 165 return stream;
3bd1e081
MD
166}
167
ffe60014 168static void steal_stream_key(int key, struct lttng_ht *ht)
7ad0a0cb
MD
169{
170 struct lttng_consumer_stream *stream;
171
04253271 172 rcu_read_lock();
ffe60014 173 stream = find_stream(key, ht);
04253271 174 if (stream) {
d88aee68 175 stream->key = -1ULL;
04253271
MD
176 /*
177 * We don't want the lookup to match, but we still need
178 * to iterate on this stream when iterating over the hash table. Just
179 * change the node key.
180 */
d88aee68 181 stream->node.key = -1ULL;
04253271
MD
182 }
183 rcu_read_unlock();
7ad0a0cb
MD
184}
185
d56db448
DG
186/*
187 * Return a channel object for the given key.
188 *
189 * RCU read side lock MUST be acquired before calling this function and
190 * protects the channel ptr.
191 */
d88aee68 192struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 193{
e4421fec 194 struct lttng_ht_iter iter;
d88aee68 195 struct lttng_ht_node_u64 *node;
e4421fec 196 struct lttng_consumer_channel *channel = NULL;
3bd1e081 197
d88aee68
DG
198 /* -1ULL keys are lookup failures */
199 if (key == (uint64_t) -1ULL) {
7ad0a0cb 200 return NULL;
7a57cf92 201 }
e4421fec 202
d88aee68
DG
203 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
204 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
205 if (node != NULL) {
206 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 207 }
e4421fec
DG
208
209 return channel;
3bd1e081
MD
210}
211
ffe60014 212static void free_stream_rcu(struct rcu_head *head)
7ad0a0cb 213{
d88aee68
DG
214 struct lttng_ht_node_u64 *node =
215 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
216 struct lttng_consumer_stream *stream =
217 caa_container_of(node, struct lttng_consumer_stream, node);
7ad0a0cb 218
ffe60014 219 free(stream);
7ad0a0cb
MD
220}
221
ffe60014 222static void free_channel_rcu(struct rcu_head *head)
702b1ea4 223{
d88aee68
DG
224 struct lttng_ht_node_u64 *node =
225 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
226 struct lttng_consumer_channel *channel =
227 caa_container_of(node, struct lttng_consumer_channel, node);
702b1ea4 228
ffe60014 229 free(channel);
702b1ea4
MD
230}
231
00e2e675
DG
232/*
233 * RCU protected relayd socket pair free.
234 */
ffe60014 235static void free_relayd_rcu(struct rcu_head *head)
00e2e675 236{
d88aee68
DG
237 struct lttng_ht_node_u64 *node =
238 caa_container_of(head, struct lttng_ht_node_u64, head);
00e2e675
DG
239 struct consumer_relayd_sock_pair *relayd =
240 caa_container_of(node, struct consumer_relayd_sock_pair, node);
241
8994307f
DG
242 /*
243 * Close all sockets. This is done in the call RCU since we don't want the
244 * socket fds to be reassigned thus potentially creating bad state of the
245 * relayd object.
246 *
247 * We do not have to lock the control socket mutex here since at this stage
248 * there is no one referencing to this relayd object.
249 */
250 (void) relayd_close(&relayd->control_sock);
251 (void) relayd_close(&relayd->data_sock);
252
00e2e675
DG
253 free(relayd);
254}
255
256/*
257 * Destroy and free relayd socket pair object.
00e2e675 258 */
51230d70 259void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
260{
261 int ret;
262 struct lttng_ht_iter iter;
263
173af62f
DG
264 if (relayd == NULL) {
265 return;
266 }
267
00e2e675
DG
268 DBG("Consumer destroy and close relayd socket pair");
269
270 iter.iter.node = &relayd->node.node;
271 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
173af62f 272 if (ret != 0) {
8994307f 273 /* We assume the relayd is being or is destroyed */
173af62f
DG
274 return;
275 }
00e2e675 276
00e2e675 277 /* RCU free() call */
ffe60014
DG
278 call_rcu(&relayd->node.head, free_relayd_rcu);
279}
280
281/*
282 * Remove a channel from the global list protected by a mutex. This function is
283 * also responsible for freeing its data structures.
284 */
285void consumer_del_channel(struct lttng_consumer_channel *channel)
286{
287 int ret;
288 struct lttng_ht_iter iter;
f2a444f1 289 struct lttng_consumer_stream *stream, *stmp;
ffe60014 290
d88aee68 291 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014
DG
292
293 pthread_mutex_lock(&consumer_data.lock);
294
295 switch (consumer_data.type) {
296 case LTTNG_CONSUMER_KERNEL:
297 break;
298 case LTTNG_CONSUMER32_UST:
299 case LTTNG_CONSUMER64_UST:
f2a444f1
DG
300 /* Delete streams that might have been left in the stream list. */
301 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
302 send_node) {
303 cds_list_del(&stream->send_node);
304 lttng_ustconsumer_del_stream(stream);
305 free(stream);
306 }
ffe60014
DG
307 lttng_ustconsumer_del_channel(channel);
308 break;
309 default:
310 ERR("Unknown consumer_data type");
311 assert(0);
312 goto end;
313 }
314
315 rcu_read_lock();
316 iter.iter.node = &channel->node.node;
317 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
318 assert(!ret);
319 rcu_read_unlock();
320
321 call_rcu(&channel->node.head, free_channel_rcu);
322end:
323 pthread_mutex_unlock(&consumer_data.lock);
00e2e675
DG
324}
325
228b5bf7
DG
326/*
327 * Iterate over the relayd hash table and destroy each element. Finally,
328 * destroy the whole hash table.
329 */
330static void cleanup_relayd_ht(void)
331{
332 struct lttng_ht_iter iter;
333 struct consumer_relayd_sock_pair *relayd;
334
335 rcu_read_lock();
336
337 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
338 node.node) {
51230d70 339 consumer_destroy_relayd(relayd);
228b5bf7
DG
340 }
341
228b5bf7 342 rcu_read_unlock();
36b588ed
MD
343
344 lttng_ht_destroy(consumer_data.relayd_ht);
228b5bf7
DG
345}
346
8994307f
DG
347/*
348 * Update the end point status of all streams having the given network sequence
349 * index (relayd index).
350 *
351 * It's atomically set without having the stream mutex locked which is fine
352 * because we handle the write/read race with a pipe wakeup for each thread.
353 */
354static void update_endpoint_status_by_netidx(int net_seq_idx,
355 enum consumer_endpoint_status status)
356{
357 struct lttng_ht_iter iter;
358 struct lttng_consumer_stream *stream;
359
360 DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
361
362 rcu_read_lock();
363
364 /* Let's begin with metadata */
365 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
366 if (stream->net_seq_idx == net_seq_idx) {
367 uatomic_set(&stream->endpoint_status, status);
368 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
369 }
370 }
371
372 /* Follow up by the data streams */
373 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
374 if (stream->net_seq_idx == net_seq_idx) {
375 uatomic_set(&stream->endpoint_status, status);
376 DBG("Delete flag set to data stream %d", stream->wait_fd);
377 }
378 }
379 rcu_read_unlock();
380}
381
382/*
383 * Cleanup a relayd object by flagging every associated streams for deletion,
384 * destroying the object meaning removing it from the relayd hash table,
385 * closing the sockets and freeing the memory in a RCU call.
386 *
387 * If a local data context is available, notify the threads that the streams'
388 * state have changed.
389 */
390static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
391 struct lttng_consumer_local_data *ctx)
392{
393 int netidx;
394
395 assert(relayd);
396
9617607b
DG
397 DBG("Cleaning up relayd sockets");
398
8994307f
DG
399 /* Save the net sequence index before destroying the object */
400 netidx = relayd->net_seq_idx;
401
402 /*
403 * Delete the relayd from the relayd hash table, close the sockets and free
404 * the object in a RCU call.
405 */
51230d70 406 consumer_destroy_relayd(relayd);
8994307f
DG
407
408 /* Set inactive endpoint to all streams */
409 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
410
411 /*
412 * With a local data context, notify the threads that the streams' state
413 * have changed. The write() action on the pipe acts as an "implicit"
414 * memory barrier ordering the updates of the end point status from the
415 * read of this status which happens AFTER receiving this notify.
416 */
417 if (ctx) {
acdb9057 418 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
13886d2d 419 notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
8994307f
DG
420 }
421}
422
a6ba4fe1
DG
423/*
424 * Flag a relayd socket pair for destruction. Destroy it if the refcount
425 * reaches zero.
426 *
427 * RCU read side lock MUST be aquired before calling this function.
428 */
429void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
430{
431 assert(relayd);
432
433 /* Set destroy flag for this object */
434 uatomic_set(&relayd->destroy_flag, 1);
435
436 /* Destroy the relayd if refcount is 0 */
437 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 438 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
439 }
440}
441
3bd1e081 442/*
1d1a276c
DG
443 * Completly destroy stream from every visiable data structure and the given
444 * hash table if one.
445 *
446 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 447 */
e316aad5
DG
448void consumer_del_stream(struct lttng_consumer_stream *stream,
449 struct lttng_ht *ht)
3bd1e081 450{
1d1a276c 451 consumer_stream_destroy(stream, ht);
3bd1e081
MD
452}
453
d88aee68
DG
454struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
455 uint64_t stream_key,
3bd1e081 456 enum lttng_consumer_stream_state state,
ffe60014 457 const char *channel_name,
6df2e2c9 458 uid_t uid,
00e2e675 459 gid_t gid,
57a269f2 460 uint64_t relayd_id,
53632229 461 uint64_t session_id,
ffe60014
DG
462 int cpu,
463 int *alloc_ret,
464 enum consumer_channel_type type)
3bd1e081 465{
ffe60014 466 int ret;
3bd1e081 467 struct lttng_consumer_stream *stream;
3bd1e081 468
effcf122 469 stream = zmalloc(sizeof(*stream));
3bd1e081 470 if (stream == NULL) {
7a57cf92 471 PERROR("malloc struct lttng_consumer_stream");
ffe60014 472 ret = -ENOMEM;
7a57cf92 473 goto end;
3bd1e081 474 }
7a57cf92 475
d56db448
DG
476 rcu_read_lock();
477
3bd1e081 478 stream->key = stream_key;
3bd1e081
MD
479 stream->out_fd = -1;
480 stream->out_fd_offset = 0;
481 stream->state = state;
6df2e2c9
MD
482 stream->uid = uid;
483 stream->gid = gid;
ffe60014 484 stream->net_seq_idx = relayd_id;
53632229 485 stream->session_id = session_id;
53632229 486 pthread_mutex_init(&stream->lock, NULL);
58b1f425 487
ffe60014
DG
488 /* If channel is the metadata, flag this stream as metadata. */
489 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
490 stream->metadata_flag = 1;
491 /* Metadata is flat out. */
492 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
58b1f425 493 } else {
ffe60014
DG
494 /* Format stream name to <channel_name>_<cpu_number> */
495 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
496 channel_name, cpu);
497 if (ret < 0) {
498 PERROR("snprintf stream name");
499 goto error;
500 }
58b1f425 501 }
c30aaa51 502
ffe60014 503 /* Key is always the wait_fd for streams. */
d88aee68 504 lttng_ht_node_init_u64(&stream->node, stream->key);
ffe60014 505
d8ef542d
MD
506 /* Init node per channel id key */
507 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
508
53632229 509 /* Init session id node with the stream session id */
d88aee68 510 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
53632229 511
d8ef542d
MD
512 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64,
513 stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id);
d56db448
DG
514
515 rcu_read_unlock();
3bd1e081 516 return stream;
c80048c6
MD
517
518error:
d56db448 519 rcu_read_unlock();
c80048c6 520 free(stream);
7a57cf92 521end:
ffe60014
DG
522 if (alloc_ret) {
523 *alloc_ret = ret;
524 }
c80048c6 525 return NULL;
3bd1e081
MD
526}
527
528/*
529 * Add a stream to the global list protected by a mutex.
530 */
ffe60014 531static int add_stream(struct lttng_consumer_stream *stream,
43c34bc3 532 struct lttng_ht *ht)
3bd1e081
MD
533{
534 int ret = 0;
00e2e675 535 struct consumer_relayd_sock_pair *relayd;
3bd1e081 536
e316aad5 537 assert(stream);
43c34bc3 538 assert(ht);
c77fc10a 539
d88aee68 540 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5
DG
541
542 pthread_mutex_lock(&consumer_data.lock);
2e818a6a 543 pthread_mutex_lock(&stream->lock);
b0b335c8 544 rcu_read_lock();
e316aad5 545
43c34bc3 546 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 547 steal_stream_key(stream->key, ht);
43c34bc3 548
d88aee68 549 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 550
d8ef542d
MD
551 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
552 &stream->node_channel_id);
553
ca22feea
DG
554 /*
555 * Add stream to the stream_list_ht of the consumer data. No need to steal
556 * the key since the HT does not use it and we allow to add redundant keys
557 * into this table.
558 */
d88aee68 559 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 560
00e2e675
DG
561 /* Check and cleanup relayd */
562 relayd = consumer_find_relayd(stream->net_seq_idx);
563 if (relayd != NULL) {
b0b335c8 564 uatomic_inc(&relayd->refcount);
00e2e675
DG
565 }
566
e316aad5 567 /*
ffe60014
DG
568 * When nb_init_stream_left reaches 0, we don't need to trigger any action
569 * in terms of destroying the associated channel, because the action that
e316aad5
DG
570 * causes the count to become 0 also causes a stream to be added. The
571 * channel deletion will thus be triggered by the following removal of this
572 * stream.
573 */
ffe60014 574 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
575 /* Increment refcount before decrementing nb_init_stream_left */
576 cmm_smp_wmb();
ffe60014 577 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
578 }
579
580 /* Update consumer data once the node is inserted. */
3bd1e081
MD
581 consumer_data.stream_count++;
582 consumer_data.need_update = 1;
583
e316aad5 584 rcu_read_unlock();
2e818a6a 585 pthread_mutex_unlock(&stream->lock);
3bd1e081 586 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 587
3bd1e081
MD
588 return ret;
589}
590
00e2e675 591/*
3f8e211f
DG
592 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
593 * be acquired before calling this.
00e2e675 594 */
d09e1200 595static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
596{
597 int ret = 0;
d88aee68 598 struct lttng_ht_node_u64 *node;
00e2e675
DG
599 struct lttng_ht_iter iter;
600
ffe60014 601 assert(relayd);
00e2e675 602
00e2e675 603 lttng_ht_lookup(consumer_data.relayd_ht,
d88aee68
DG
604 &relayd->net_seq_idx, &iter);
605 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675 606 if (node != NULL) {
00e2e675
DG
607 goto end;
608 }
d88aee68 609 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
00e2e675 610
00e2e675
DG
611end:
612 return ret;
613}
614
615/*
616 * Allocate and return a consumer relayd socket.
617 */
618struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
619 int net_seq_idx)
620{
621 struct consumer_relayd_sock_pair *obj = NULL;
622
623 /* Negative net sequence index is a failure */
624 if (net_seq_idx < 0) {
625 goto error;
626 }
627
628 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
629 if (obj == NULL) {
630 PERROR("zmalloc relayd sock");
631 goto error;
632 }
633
634 obj->net_seq_idx = net_seq_idx;
635 obj->refcount = 0;
173af62f 636 obj->destroy_flag = 0;
f96e4545
MD
637 obj->control_sock.sock.fd = -1;
638 obj->data_sock.sock.fd = -1;
d88aee68 639 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
00e2e675
DG
640 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
641
642error:
643 return obj;
644}
645
646/*
647 * Find a relayd socket pair in the global consumer data.
648 *
649 * Return the object if found else NULL.
b0b335c8
MD
650 * RCU read-side lock must be held across this call and while using the
651 * returned object.
00e2e675 652 */
d88aee68 653struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
654{
655 struct lttng_ht_iter iter;
d88aee68 656 struct lttng_ht_node_u64 *node;
00e2e675
DG
657 struct consumer_relayd_sock_pair *relayd = NULL;
658
659 /* Negative keys are lookup failures */
d88aee68 660 if (key == (uint64_t) -1ULL) {
00e2e675
DG
661 goto error;
662 }
663
d88aee68 664 lttng_ht_lookup(consumer_data.relayd_ht, &key,
00e2e675 665 &iter);
d88aee68 666 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675
DG
667 if (node != NULL) {
668 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
669 }
670
00e2e675
DG
671error:
672 return relayd;
673}
674
675/*
676 * Handle stream for relayd transmission if the stream applies for network
677 * streaming where the net sequence index is set.
678 *
679 * Return destination file descriptor or negative value on error.
680 */
6197aea7 681static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
1d4dfdef
DG
682 size_t data_size, unsigned long padding,
683 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
684{
685 int outfd = -1, ret;
00e2e675
DG
686 struct lttcomm_relayd_data_hdr data_hdr;
687
688 /* Safety net */
689 assert(stream);
6197aea7 690 assert(relayd);
00e2e675
DG
691
692 /* Reset data header */
693 memset(&data_hdr, 0, sizeof(data_hdr));
694
00e2e675
DG
695 if (stream->metadata_flag) {
696 /* Caller MUST acquire the relayd control socket lock */
697 ret = relayd_send_metadata(&relayd->control_sock, data_size);
698 if (ret < 0) {
699 goto error;
700 }
701
702 /* Metadata are always sent on the control socket. */
6151a90f 703 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
704 } else {
705 /* Set header with stream information */
706 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
707 data_hdr.data_size = htobe32(data_size);
1d4dfdef 708 data_hdr.padding_size = htobe32(padding);
39df6d9f
DG
709 /*
710 * Note that net_seq_num below is assigned with the *current* value of
711 * next_net_seq_num and only after that the next_net_seq_num will be
712 * increment. This is why when issuing a command on the relayd using
713 * this next value, 1 should always be substracted in order to compare
714 * the last seen sequence number on the relayd side to the last sent.
715 */
3604f373 716 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
717 /* Other fields are zeroed previously */
718
719 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
720 sizeof(data_hdr));
721 if (ret < 0) {
722 goto error;
723 }
724
3604f373
DG
725 ++stream->next_net_seq_num;
726
00e2e675 727 /* Set to go on data socket */
6151a90f 728 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
729 }
730
731error:
732 return outfd;
733}
734
3bd1e081 735/*
ffe60014
DG
736 * Allocate and return a new lttng_consumer_channel object using the given key
737 * to initialize the hash table node.
738 *
739 * On error, return NULL.
3bd1e081 740 */
886224ff 741struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
ffe60014
DG
742 uint64_t session_id,
743 const char *pathname,
744 const char *name,
745 uid_t uid,
746 gid_t gid,
57a269f2 747 uint64_t relayd_id,
1624d5b7
JD
748 enum lttng_event_output output,
749 uint64_t tracefile_size,
750 uint64_t tracefile_count)
3bd1e081
MD
751{
752 struct lttng_consumer_channel *channel;
3bd1e081 753
276b26d1 754 channel = zmalloc(sizeof(*channel));
3bd1e081 755 if (channel == NULL) {
7a57cf92 756 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
757 goto end;
758 }
ffe60014
DG
759
760 channel->key = key;
3bd1e081 761 channel->refcount = 0;
ffe60014
DG
762 channel->session_id = session_id;
763 channel->uid = uid;
764 channel->gid = gid;
765 channel->relayd_id = relayd_id;
766 channel->output = output;
1624d5b7
JD
767 channel->tracefile_size = tracefile_size;
768 channel->tracefile_count = tracefile_count;
ffe60014
DG
769
770 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
771 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
772
773 strncpy(channel->name, name, sizeof(channel->name));
774 channel->name[sizeof(channel->name) - 1] = '\0';
775
d88aee68 776 lttng_ht_node_init_u64(&channel->node, channel->key);
d8ef542d
MD
777
778 channel->wait_fd = -1;
779
ffe60014
DG
780 CDS_INIT_LIST_HEAD(&channel->streams.head);
781
d88aee68 782 DBG("Allocated channel (key %" PRIu64 ")", channel->key)
3bd1e081 783
3bd1e081
MD
784end:
785 return channel;
786}
787
788/*
789 * Add a channel to the global list protected by a mutex.
821fffb2
DG
790 *
791 * On success 0 is returned else a negative value.
3bd1e081 792 */
d8ef542d
MD
793int consumer_add_channel(struct lttng_consumer_channel *channel,
794 struct lttng_consumer_local_data *ctx)
3bd1e081 795{
ffe60014 796 int ret = 0;
d88aee68 797 struct lttng_ht_node_u64 *node;
c77fc10a
DG
798 struct lttng_ht_iter iter;
799
3bd1e081 800 pthread_mutex_lock(&consumer_data.lock);
6065ceec 801 rcu_read_lock();
c77fc10a 802
7972aab2 803 lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
d88aee68 804 node = lttng_ht_iter_get_node_u64(&iter);
c77fc10a
DG
805 if (node != NULL) {
806 /* Channel already exist. Ignore the insertion */
d88aee68
DG
807 ERR("Consumer add channel key %" PRIu64 " already exists!",
808 channel->key);
821fffb2 809 ret = -EEXIST;
c77fc10a
DG
810 goto end;
811 }
812
d88aee68 813 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
814
815end:
6065ceec 816 rcu_read_unlock();
3bd1e081 817 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 818
d8ef542d
MD
819 if (!ret && channel->wait_fd != -1 &&
820 channel->metadata_stream == NULL) {
a0cbdd2e 821 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 822 }
ffe60014 823 return ret;
3bd1e081
MD
824}
825
826/*
827 * Allocate the pollfd structure and the local view of the out fds to avoid
828 * doing a lookup in the linked list and concurrency issues when writing is
829 * needed. Called with consumer_data.lock held.
830 *
831 * Returns the number of fds in the structures.
832 */
ffe60014
DG
833static int update_poll_array(struct lttng_consumer_local_data *ctx,
834 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
835 struct lttng_ht *ht)
3bd1e081 836{
3bd1e081 837 int i = 0;
e4421fec
DG
838 struct lttng_ht_iter iter;
839 struct lttng_consumer_stream *stream;
3bd1e081 840
ffe60014
DG
841 assert(ctx);
842 assert(ht);
843 assert(pollfd);
844 assert(local_stream);
845
3bd1e081 846 DBG("Updating poll fd array");
481d6c57 847 rcu_read_lock();
43c34bc3 848 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
849 /*
850 * Only active streams with an active end point can be added to the
851 * poll set and local stream storage of the thread.
852 *
853 * There is a potential race here for endpoint_status to be updated
854 * just after the check. However, this is OK since the stream(s) will
855 * be deleted once the thread is notified that the end point state has
856 * changed where this function will be called back again.
857 */
858 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
79d4ffb7 859 stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
3bd1e081
MD
860 continue;
861 }
7972aab2
DG
862 /*
863 * This clobbers way too much the debug output. Uncomment that if you
864 * need it for debugging purposes.
865 *
866 * DBG("Active FD %d", stream->wait_fd);
867 */
e4421fec 868 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 869 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 870 local_stream[i] = stream;
3bd1e081
MD
871 i++;
872 }
481d6c57 873 rcu_read_unlock();
3bd1e081
MD
874
875 /*
50f8ae69 876 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
877 * increment i so nb_fd is the number of real FD.
878 */
acdb9057 879 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 880 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
881 return i;
882}
883
884/*
885 * Poll on the should_quit pipe and the command socket return -1 on error and
886 * should exit, 0 if data is available on the command socket
887 */
888int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
889{
890 int num_rdy;
891
88f2b785 892restart:
3bd1e081
MD
893 num_rdy = poll(consumer_sockpoll, 2, -1);
894 if (num_rdy == -1) {
88f2b785
MD
895 /*
896 * Restart interrupted system call.
897 */
898 if (errno == EINTR) {
899 goto restart;
900 }
7a57cf92 901 PERROR("Poll error");
3bd1e081
MD
902 goto exit;
903 }
509bb1cf 904 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
905 DBG("consumer_should_quit wake up");
906 goto exit;
907 }
908 return 0;
909
910exit:
911 return -1;
912}
913
914/*
915 * Set the error socket.
916 */
ffe60014
DG
917void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
918 int sock)
3bd1e081
MD
919{
920 ctx->consumer_error_socket = sock;
921}
922
923/*
924 * Set the command socket path.
925 */
3bd1e081
MD
926void lttng_consumer_set_command_sock_path(
927 struct lttng_consumer_local_data *ctx, char *sock)
928{
929 ctx->consumer_command_sock_path = sock;
930}
931
932/*
933 * Send return code to the session daemon.
934 * If the socket is not defined, we return 0, it is not a fatal error
935 */
ffe60014 936int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
937{
938 if (ctx->consumer_error_socket > 0) {
939 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
940 sizeof(enum lttcomm_sessiond_command));
941 }
942
943 return 0;
944}
945
946/*
228b5bf7
DG
947 * Close all the tracefiles and stream fds and MUST be called when all
948 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081
MD
949 */
950void lttng_consumer_cleanup(void)
951{
e4421fec 952 struct lttng_ht_iter iter;
ffe60014 953 struct lttng_consumer_channel *channel;
6065ceec
DG
954
955 rcu_read_lock();
3bd1e081 956
ffe60014
DG
957 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
958 node.node) {
702b1ea4 959 consumer_del_channel(channel);
3bd1e081 960 }
6065ceec
DG
961
962 rcu_read_unlock();
d6ce1df2 963
d6ce1df2 964 lttng_ht_destroy(consumer_data.channel_ht);
228b5bf7
DG
965
966 cleanup_relayd_ht();
967
d8ef542d
MD
968 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
969
228b5bf7
DG
970 /*
971 * This HT contains streams that are freed by either the metadata thread or
972 * the data thread so we do *nothing* on the hash table and simply destroy
973 * it.
974 */
975 lttng_ht_destroy(consumer_data.stream_list_ht);
3bd1e081
MD
976}
977
978/*
979 * Called from signal handler.
980 */
981void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
982{
983 int ret;
984 consumer_quit = 1;
6f94560a
MD
985 do {
986 ret = write(ctx->consumer_should_quit[1], "4", 1);
987 } while (ret < 0 && errno == EINTR);
4cec016f 988 if (ret < 0 || ret != 1) {
7a57cf92 989 PERROR("write consumer quit");
3bd1e081 990 }
ab1027f4
DG
991
992 DBG("Consumer flag that it should quit");
3bd1e081
MD
993}
994
00e2e675
DG
995void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
996 off_t orig_offset)
3bd1e081
MD
997{
998 int outfd = stream->out_fd;
999
1000 /*
1001 * This does a blocking write-and-wait on any page that belongs to the
1002 * subbuffer prior to the one we just wrote.
1003 * Don't care about error values, as these are just hints and ways to
1004 * limit the amount of page cache used.
1005 */
ffe60014 1006 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1007 return;
1008 }
ffe60014
DG
1009 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1010 stream->max_sb_size,
3bd1e081
MD
1011 SYNC_FILE_RANGE_WAIT_BEFORE
1012 | SYNC_FILE_RANGE_WRITE
1013 | SYNC_FILE_RANGE_WAIT_AFTER);
1014 /*
1015 * Give hints to the kernel about how we access the file:
1016 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1017 * we write it.
1018 *
1019 * We need to call fadvise again after the file grows because the
1020 * kernel does not seem to apply fadvise to non-existing parts of the
1021 * file.
1022 *
1023 * Call fadvise _after_ having waited for the page writeback to
1024 * complete because the dirty page writeback semantic is not well
1025 * defined. So it can be expected to lead to lower throughput in
1026 * streaming.
1027 */
ffe60014
DG
1028 posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1029 stream->max_sb_size, POSIX_FADV_DONTNEED);
3bd1e081
MD
1030}
1031
1032/*
1033 * Initialise the necessary environnement :
1034 * - create a new context
1035 * - create the poll_pipe
1036 * - create the should_quit pipe (for signal handler)
1037 * - create the thread pipe (for splice)
1038 *
1039 * Takes a function pointer as argument, this function is called when data is
1040 * available on a buffer. This function is responsible to do the
1041 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1042 * buffer configuration and then kernctl_put_next_subbuf at the end.
1043 *
1044 * Returns a pointer to the new context or NULL on error.
1045 */
1046struct lttng_consumer_local_data *lttng_consumer_create(
1047 enum lttng_consumer_type type,
4078b776 1048 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 1049 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
1050 int (*recv_channel)(struct lttng_consumer_channel *channel),
1051 int (*recv_stream)(struct lttng_consumer_stream *stream),
1052 int (*update_stream)(int stream_key, uint32_t state))
1053{
d8ef542d 1054 int ret;
3bd1e081
MD
1055 struct lttng_consumer_local_data *ctx;
1056
1057 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1058 consumer_data.type == type);
1059 consumer_data.type = type;
1060
effcf122 1061 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081 1062 if (ctx == NULL) {
7a57cf92 1063 PERROR("allocating context");
3bd1e081
MD
1064 goto error;
1065 }
1066
1067 ctx->consumer_error_socket = -1;
331744e3 1068 ctx->consumer_metadata_socket = -1;
3bd1e081
MD
1069 /* assign the callbacks */
1070 ctx->on_buffer_ready = buffer_ready;
1071 ctx->on_recv_channel = recv_channel;
1072 ctx->on_recv_stream = recv_stream;
1073 ctx->on_update_stream = update_stream;
1074
acdb9057
DG
1075 ctx->consumer_data_pipe = lttng_pipe_open(0);
1076 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1077 goto error_poll_pipe;
1078 }
1079
1080 ret = pipe(ctx->consumer_should_quit);
1081 if (ret < 0) {
7a57cf92 1082 PERROR("Error creating recv pipe");
3bd1e081
MD
1083 goto error_quit_pipe;
1084 }
1085
1086 ret = pipe(ctx->consumer_thread_pipe);
1087 if (ret < 0) {
7a57cf92 1088 PERROR("Error creating thread pipe");
3bd1e081
MD
1089 goto error_thread_pipe;
1090 }
1091
d8ef542d
MD
1092 ret = pipe(ctx->consumer_channel_pipe);
1093 if (ret < 0) {
1094 PERROR("Error creating channel pipe");
1095 goto error_channel_pipe;
1096 }
1097
13886d2d
DG
1098 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1099 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1100 goto error_metadata_pipe;
1101 }
3bd1e081 1102
fb3a43a9
DG
1103 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1104 if (ret < 0) {
1105 goto error_splice_pipe;
1106 }
1107
1108 return ctx;
3bd1e081 1109
fb3a43a9 1110error_splice_pipe:
13886d2d 1111 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
fb3a43a9 1112error_metadata_pipe:
d8ef542d
MD
1113 utils_close_pipe(ctx->consumer_channel_pipe);
1114error_channel_pipe:
fb3a43a9 1115 utils_close_pipe(ctx->consumer_thread_pipe);
3bd1e081 1116error_thread_pipe:
d8ef542d 1117 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1118error_quit_pipe:
acdb9057 1119 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1120error_poll_pipe:
1121 free(ctx);
1122error:
1123 return NULL;
1124}
1125
1126/*
1127 * Close all fds associated with the instance and free the context.
1128 */
1129void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1130{
4c462e79
MD
1131 int ret;
1132
ab1027f4
DG
1133 DBG("Consumer destroying it. Closing everything.");
1134
4c462e79
MD
1135 ret = close(ctx->consumer_error_socket);
1136 if (ret) {
1137 PERROR("close");
1138 }
331744e3
JD
1139 ret = close(ctx->consumer_metadata_socket);
1140 if (ret) {
1141 PERROR("close");
1142 }
d8ef542d
MD
1143 utils_close_pipe(ctx->consumer_thread_pipe);
1144 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1145 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1146 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
d8ef542d 1147 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9
DG
1148 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1149
3bd1e081
MD
1150 unlink(ctx->consumer_command_sock_path);
1151 free(ctx);
1152}
1153
6197aea7
DG
1154/*
1155 * Write the metadata stream id on the specified file descriptor.
1156 */
1157static int write_relayd_metadata_id(int fd,
1158 struct lttng_consumer_stream *stream,
ffe60014 1159 struct consumer_relayd_sock_pair *relayd, unsigned long padding)
6197aea7
DG
1160{
1161 int ret;
1d4dfdef 1162 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1163
1d4dfdef
DG
1164 hdr.stream_id = htobe64(stream->relayd_stream_id);
1165 hdr.padding_size = htobe32(padding);
6197aea7 1166 do {
1d4dfdef 1167 ret = write(fd, (void *) &hdr, sizeof(hdr));
6197aea7 1168 } while (ret < 0 && errno == EINTR);
4cec016f 1169 if (ret < 0 || ret != sizeof(hdr)) {
d7b75ec8
DG
1170 /*
1171 * This error means that the fd's end is closed so ignore the perror
1172 * not to clubber the error output since this can happen in a normal
1173 * code path.
1174 */
1175 if (errno != EPIPE) {
1176 PERROR("write metadata stream id");
1177 }
1178 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1179 /*
1180 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1181 * handle writting the missing part so report that as an error and
1182 * don't lie to the caller.
1183 */
1184 ret = -1;
6197aea7
DG
1185 goto end;
1186 }
1d4dfdef
DG
1187 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1188 stream->relayd_stream_id, padding);
6197aea7
DG
1189
1190end:
1191 return ret;
1192}
1193
3bd1e081 1194/*
09e26845
DG
1195 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1196 * core function for writing trace buffers to either the local filesystem or
1197 * the network.
1198 *
79d4ffb7
DG
1199 * It must be called with the stream lock held.
1200 *
09e26845 1201 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1202 *
1203 * Returns the number of bytes written
1204 */
4078b776 1205ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081 1206 struct lttng_consumer_local_data *ctx,
1d4dfdef
DG
1207 struct lttng_consumer_stream *stream, unsigned long len,
1208 unsigned long padding)
3bd1e081 1209{
f02e1e8a 1210 unsigned long mmap_offset;
ffe60014 1211 void *mmap_base;
f02e1e8a
DG
1212 ssize_t ret = 0, written = 0;
1213 off_t orig_offset = stream->out_fd_offset;
1214 /* Default is on the disk */
1215 int outfd = stream->out_fd;
f02e1e8a 1216 struct consumer_relayd_sock_pair *relayd = NULL;
8994307f 1217 unsigned int relayd_hang_up = 0;
f02e1e8a
DG
1218
1219 /* RCU lock for the relayd pointer */
1220 rcu_read_lock();
1221
1222 /* Flag that the current stream if set for network streaming. */
1223 if (stream->net_seq_idx != -1) {
1224 relayd = consumer_find_relayd(stream->net_seq_idx);
1225 if (relayd == NULL) {
1226 goto end;
1227 }
1228 }
1229
1230 /* get the offset inside the fd to mmap */
3bd1e081
MD
1231 switch (consumer_data.type) {
1232 case LTTNG_CONSUMER_KERNEL:
ffe60014 1233 mmap_base = stream->mmap_base;
f02e1e8a
DG
1234 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1235 break;
7753dea8
MD
1236 case LTTNG_CONSUMER32_UST:
1237 case LTTNG_CONSUMER64_UST:
ffe60014
DG
1238 mmap_base = lttng_ustctl_get_mmap_base(stream);
1239 if (!mmap_base) {
1240 ERR("read mmap get mmap base for stream %s", stream->name);
1241 written = -1;
1242 goto end;
1243 }
1244 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
331744e3 1245
f02e1e8a 1246 break;
3bd1e081
MD
1247 default:
1248 ERR("Unknown consumer_data type");
1249 assert(0);
1250 }
f02e1e8a
DG
1251 if (ret != 0) {
1252 errno = -ret;
1253 PERROR("tracer ctl get_mmap_read_offset");
1254 written = ret;
1255 goto end;
1256 }
b9182dd9 1257
f02e1e8a
DG
1258 /* Handle stream on the relayd if the output is on the network */
1259 if (relayd) {
1260 unsigned long netlen = len;
1261
1262 /*
1263 * Lock the control socket for the complete duration of the function
1264 * since from this point on we will use the socket.
1265 */
1266 if (stream->metadata_flag) {
1267 /* Metadata requires the control socket. */
1268 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1d4dfdef 1269 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1270 }
1271
1d4dfdef 1272 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
f02e1e8a
DG
1273 if (ret >= 0) {
1274 /* Use the returned socket. */
1275 outfd = ret;
1276
1277 /* Write metadata stream id before payload */
1278 if (stream->metadata_flag) {
1d4dfdef 1279 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
f02e1e8a 1280 if (ret < 0) {
f02e1e8a 1281 written = ret;
8994307f
DG
1282 /* Socket operation failed. We consider the relayd dead */
1283 if (ret == -EPIPE || ret == -EINVAL) {
1284 relayd_hang_up = 1;
1285 goto write_error;
1286 }
f02e1e8a
DG
1287 goto end;
1288 }
f02e1e8a 1289 }
8994307f
DG
1290 } else {
1291 /* Socket operation failed. We consider the relayd dead */
1292 if (ret == -EPIPE || ret == -EINVAL) {
1293 relayd_hang_up = 1;
1294 goto write_error;
1295 }
1296 /* Else, use the default set before which is the filesystem. */
f02e1e8a 1297 }
1d4dfdef
DG
1298 } else {
1299 /* No streaming, we have to set the len with the full padding */
1300 len += padding;
1624d5b7
JD
1301
1302 /*
1303 * Check if we need to change the tracefile before writing the packet.
1304 */
1305 if (stream->chan->tracefile_size > 0 &&
1306 (stream->tracefile_size_current + len) >
1307 stream->chan->tracefile_size) {
fe4477ee
JD
1308 ret = utils_rotate_stream_file(stream->chan->pathname,
1309 stream->name, stream->chan->tracefile_size,
1310 stream->chan->tracefile_count, stream->uid, stream->gid,
1311 stream->out_fd, &(stream->tracefile_count_current));
1624d5b7
JD
1312 if (ret < 0) {
1313 ERR("Rotating output file");
1314 goto end;
1315 }
fe4477ee 1316 outfd = stream->out_fd = ret;
a6976990
DG
1317 /* Reset current size because we just perform a rotation. */
1318 stream->tracefile_size_current = 0;
1624d5b7
JD
1319 }
1320 stream->tracefile_size_current += len;
f02e1e8a
DG
1321 }
1322
1323 while (len > 0) {
1324 do {
ffe60014 1325 ret = write(outfd, mmap_base + mmap_offset, len);
f02e1e8a 1326 } while (ret < 0 && errno == EINTR);
1d4dfdef 1327 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
f02e1e8a 1328 if (ret < 0) {
c5c45efa
DG
1329 /*
1330 * This is possible if the fd is closed on the other side (outfd)
1331 * or any write problem. It can be verbose a bit for a normal
1332 * execution if for instance the relayd is stopped abruptly. This
1333 * can happen so set this to a DBG statement.
1334 */
1335 DBG("Error in file write mmap");
f02e1e8a
DG
1336 if (written == 0) {
1337 written = ret;
1338 }
8994307f
DG
1339 /* Socket operation failed. We consider the relayd dead */
1340 if (errno == EPIPE || errno == EINVAL) {
1341 relayd_hang_up = 1;
1342 goto write_error;
1343 }
f02e1e8a
DG
1344 goto end;
1345 } else if (ret > len) {
77c7c900 1346 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
f02e1e8a
DG
1347 written += ret;
1348 goto end;
1349 } else {
1350 len -= ret;
1351 mmap_offset += ret;
1352 }
f02e1e8a
DG
1353
1354 /* This call is useless on a socket so better save a syscall. */
1355 if (!relayd) {
1356 /* This won't block, but will start writeout asynchronously */
1357 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1358 SYNC_FILE_RANGE_WRITE);
1359 stream->out_fd_offset += ret;
1360 }
1361 written += ret;
1362 }
1363 lttng_consumer_sync_trace_file(stream, orig_offset);
1364
8994307f
DG
1365write_error:
1366 /*
1367 * This is a special case that the relayd has closed its socket. Let's
1368 * cleanup the relayd object and all associated streams.
1369 */
1370 if (relayd && relayd_hang_up) {
1371 cleanup_relayd(relayd, ctx);
1372 }
1373
f02e1e8a
DG
1374end:
1375 /* Unlock only if ctrl socket used */
1376 if (relayd && stream->metadata_flag) {
1377 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1378 }
1379
1380 rcu_read_unlock();
1381 return written;
3bd1e081
MD
1382}
1383
1384/*
1385 * Splice the data from the ring buffer to the tracefile.
1386 *
79d4ffb7
DG
1387 * It must be called with the stream lock held.
1388 *
3bd1e081
MD
1389 * Returns the number of bytes spliced.
1390 */
4078b776 1391ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081 1392 struct lttng_consumer_local_data *ctx,
1d4dfdef
DG
1393 struct lttng_consumer_stream *stream, unsigned long len,
1394 unsigned long padding)
3bd1e081 1395{
f02e1e8a
DG
1396 ssize_t ret = 0, written = 0, ret_splice = 0;
1397 loff_t offset = 0;
1398 off_t orig_offset = stream->out_fd_offset;
1399 int fd = stream->wait_fd;
1400 /* Default is on the disk */
1401 int outfd = stream->out_fd;
f02e1e8a 1402 struct consumer_relayd_sock_pair *relayd = NULL;
fb3a43a9 1403 int *splice_pipe;
8994307f 1404 unsigned int relayd_hang_up = 0;
f02e1e8a 1405
3bd1e081
MD
1406 switch (consumer_data.type) {
1407 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1408 break;
7753dea8
MD
1409 case LTTNG_CONSUMER32_UST:
1410 case LTTNG_CONSUMER64_UST:
f02e1e8a 1411 /* Not supported for user space tracing */
3bd1e081
MD
1412 return -ENOSYS;
1413 default:
1414 ERR("Unknown consumer_data type");
1415 assert(0);
3bd1e081
MD
1416 }
1417
f02e1e8a
DG
1418 /* RCU lock for the relayd pointer */
1419 rcu_read_lock();
1420
1421 /* Flag that the current stream if set for network streaming. */
1422 if (stream->net_seq_idx != -1) {
1423 relayd = consumer_find_relayd(stream->net_seq_idx);
1424 if (relayd == NULL) {
1425 goto end;
1426 }
1427 }
1428
fb3a43a9
DG
1429 /*
1430 * Choose right pipe for splice. Metadata and trace data are handled by
1431 * different threads hence the use of two pipes in order not to race or
1432 * corrupt the written data.
1433 */
1434 if (stream->metadata_flag) {
1435 splice_pipe = ctx->consumer_splice_metadata_pipe;
1436 } else {
1437 splice_pipe = ctx->consumer_thread_pipe;
1438 }
1439
f02e1e8a 1440 /* Write metadata stream id before payload */
1d4dfdef
DG
1441 if (relayd) {
1442 int total_len = len;
f02e1e8a 1443
1d4dfdef
DG
1444 if (stream->metadata_flag) {
1445 /*
1446 * Lock the control socket for the complete duration of the function
1447 * since from this point on we will use the socket.
1448 */
1449 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1450
1451 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1452 padding);
1453 if (ret < 0) {
1454 written = ret;
8994307f
DG
1455 /* Socket operation failed. We consider the relayd dead */
1456 if (ret == -EBADF) {
1457 WARN("Remote relayd disconnected. Stopping");
1458 relayd_hang_up = 1;
1459 goto write_error;
1460 }
1d4dfdef
DG
1461 goto end;
1462 }
1463
1464 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1465 }
1466
1467 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1468 if (ret >= 0) {
1469 /* Use the returned socket. */
1470 outfd = ret;
1471 } else {
8994307f
DG
1472 /* Socket operation failed. We consider the relayd dead */
1473 if (ret == -EBADF) {
1474 WARN("Remote relayd disconnected. Stopping");
1475 relayd_hang_up = 1;
1476 goto write_error;
1477 }
f02e1e8a
DG
1478 goto end;
1479 }
1d4dfdef
DG
1480 } else {
1481 /* No streaming, we have to set the len with the full padding */
1482 len += padding;
1624d5b7
JD
1483
1484 /*
1485 * Check if we need to change the tracefile before writing the packet.
1486 */
1487 if (stream->chan->tracefile_size > 0 &&
1488 (stream->tracefile_size_current + len) >
1489 stream->chan->tracefile_size) {
fe4477ee
JD
1490 ret = utils_rotate_stream_file(stream->chan->pathname,
1491 stream->name, stream->chan->tracefile_size,
1492 stream->chan->tracefile_count, stream->uid, stream->gid,
1493 stream->out_fd, &(stream->tracefile_count_current));
1624d5b7
JD
1494 if (ret < 0) {
1495 ERR("Rotating output file");
1496 goto end;
1497 }
fe4477ee 1498 outfd = stream->out_fd = ret;
a6976990
DG
1499 /* Reset current size because we just perform a rotation. */
1500 stream->tracefile_size_current = 0;
1624d5b7
JD
1501 }
1502 stream->tracefile_size_current += len;
f02e1e8a
DG
1503 }
1504
1505 while (len > 0) {
1d4dfdef
DG
1506 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1507 (unsigned long)offset, len, fd, splice_pipe[1]);
fb3a43a9 1508 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
f02e1e8a
DG
1509 SPLICE_F_MOVE | SPLICE_F_MORE);
1510 DBG("splice chan to pipe, ret %zd", ret_splice);
1511 if (ret_splice < 0) {
1512 PERROR("Error in relay splice");
1513 if (written == 0) {
1514 written = ret_splice;
1515 }
1516 ret = errno;
1517 goto splice_error;
1518 }
1519
1520 /* Handle stream on the relayd if the output is on the network */
1521 if (relayd) {
1522 if (stream->metadata_flag) {
1d4dfdef
DG
1523 size_t metadata_payload_size =
1524 sizeof(struct lttcomm_relayd_metadata_payload);
1525
f02e1e8a 1526 /* Update counter to fit the spliced data */
1d4dfdef
DG
1527 ret_splice += metadata_payload_size;
1528 len += metadata_payload_size;
f02e1e8a
DG
1529 /*
1530 * We do this so the return value can match the len passed as
1531 * argument to this function.
1532 */
1d4dfdef 1533 written -= metadata_payload_size;
f02e1e8a
DG
1534 }
1535 }
1536
1537 /* Splice data out */
fb3a43a9 1538 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
f02e1e8a 1539 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1d4dfdef 1540 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
f02e1e8a
DG
1541 if (ret_splice < 0) {
1542 PERROR("Error in file splice");
1543 if (written == 0) {
1544 written = ret_splice;
1545 }
8994307f 1546 /* Socket operation failed. We consider the relayd dead */
00c8752b 1547 if (errno == EBADF || errno == EPIPE) {
8994307f
DG
1548 WARN("Remote relayd disconnected. Stopping");
1549 relayd_hang_up = 1;
1550 goto write_error;
1551 }
f02e1e8a
DG
1552 ret = errno;
1553 goto splice_error;
1554 } else if (ret_splice > len) {
1555 errno = EINVAL;
1556 PERROR("Wrote more data than requested %zd (len: %lu)",
1557 ret_splice, len);
1558 written += ret_splice;
1559 ret = errno;
1560 goto splice_error;
1561 }
1562 len -= ret_splice;
1563
1564 /* This call is useless on a socket so better save a syscall. */
1565 if (!relayd) {
1566 /* This won't block, but will start writeout asynchronously */
1567 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1568 SYNC_FILE_RANGE_WRITE);
1569 stream->out_fd_offset += ret_splice;
1570 }
1571 written += ret_splice;
1572 }
1573 lttng_consumer_sync_trace_file(stream, orig_offset);
1574
1575 ret = ret_splice;
1576
1577 goto end;
1578
8994307f
DG
1579write_error:
1580 /*
1581 * This is a special case that the relayd has closed its socket. Let's
1582 * cleanup the relayd object and all associated streams.
1583 */
1584 if (relayd && relayd_hang_up) {
1585 cleanup_relayd(relayd, ctx);
1586 /* Skip splice error so the consumer does not fail */
1587 goto end;
1588 }
1589
f02e1e8a
DG
1590splice_error:
1591 /* send the appropriate error description to sessiond */
1592 switch (ret) {
f02e1e8a 1593 case EINVAL:
f73fabfd 1594 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1595 break;
1596 case ENOMEM:
f73fabfd 1597 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1598 break;
1599 case ESPIPE:
f73fabfd 1600 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1601 break;
1602 }
1603
1604end:
1605 if (relayd && stream->metadata_flag) {
1606 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1607 }
1608
1609 rcu_read_unlock();
1610 return written;
3bd1e081
MD
1611}
1612
1613/*
1614 * Take a snapshot for a specific fd
1615 *
1616 * Returns 0 on success, < 0 on error
1617 */
ffe60014 1618int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
1619{
1620 switch (consumer_data.type) {
1621 case LTTNG_CONSUMER_KERNEL:
ffe60014 1622 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
1623 case LTTNG_CONSUMER32_UST:
1624 case LTTNG_CONSUMER64_UST:
ffe60014 1625 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
1626 default:
1627 ERR("Unknown consumer_data type");
1628 assert(0);
1629 return -ENOSYS;
1630 }
3bd1e081
MD
1631}
1632
1633/*
1634 * Get the produced position
1635 *
1636 * Returns 0 on success, < 0 on error
1637 */
ffe60014 1638int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
1639 unsigned long *pos)
1640{
1641 switch (consumer_data.type) {
1642 case LTTNG_CONSUMER_KERNEL:
ffe60014 1643 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
1644 case LTTNG_CONSUMER32_UST:
1645 case LTTNG_CONSUMER64_UST:
ffe60014 1646 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
1647 default:
1648 ERR("Unknown consumer_data type");
1649 assert(0);
1650 return -ENOSYS;
1651 }
1652}
1653
1654int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1655 int sock, struct pollfd *consumer_sockpoll)
1656{
1657 switch (consumer_data.type) {
1658 case LTTNG_CONSUMER_KERNEL:
1659 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1660 case LTTNG_CONSUMER32_UST:
1661 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1662 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1663 default:
1664 ERR("Unknown consumer_data type");
1665 assert(0);
1666 return -ENOSYS;
1667 }
1668}
1669
43c34bc3
DG
1670/*
1671 * Iterate over all streams of the hashtable and free them properly.
1672 *
1673 * WARNING: *MUST* be used with data stream only.
1674 */
1675static void destroy_data_stream_ht(struct lttng_ht *ht)
1676{
43c34bc3
DG
1677 struct lttng_ht_iter iter;
1678 struct lttng_consumer_stream *stream;
1679
1680 if (ht == NULL) {
1681 return;
1682 }
1683
1684 rcu_read_lock();
1685 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
5c540210
DG
1686 /*
1687 * Ignore return value since we are currently cleaning up so any error
1688 * can't be handled.
1689 */
1690 (void) consumer_del_stream(stream, ht);
43c34bc3
DG
1691 }
1692 rcu_read_unlock();
1693
1694 lttng_ht_destroy(ht);
1695}
1696
fb3a43a9 1697/*
f724d81e 1698 * Iterate over all streams of the hashtable and free them properly.
e316aad5
DG
1699 *
1700 * XXX: Should not be only for metadata stream or else use an other name.
fb3a43a9
DG
1701 */
1702static void destroy_stream_ht(struct lttng_ht *ht)
1703{
fb3a43a9
DG
1704 struct lttng_ht_iter iter;
1705 struct lttng_consumer_stream *stream;
1706
1707 if (ht == NULL) {
1708 return;
1709 }
1710
d09e1200 1711 rcu_read_lock();
58b1f425 1712 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
5c540210
DG
1713 /*
1714 * Ignore return value since we are currently cleaning up so any error
1715 * can't be handled.
1716 */
1717 (void) consumer_del_metadata_stream(stream, ht);
fb3a43a9 1718 }
d09e1200 1719 rcu_read_unlock();
fb3a43a9
DG
1720
1721 lttng_ht_destroy(ht);
1722}
1723
d88aee68
DG
1724void lttng_consumer_close_metadata(void)
1725{
1726 switch (consumer_data.type) {
1727 case LTTNG_CONSUMER_KERNEL:
1728 /*
1729 * The Kernel consumer has a different metadata scheme so we don't
1730 * close anything because the stream will be closed by the session
1731 * daemon.
1732 */
1733 break;
1734 case LTTNG_CONSUMER32_UST:
1735 case LTTNG_CONSUMER64_UST:
1736 /*
1737 * Close all metadata streams. The metadata hash table is passed and
1738 * this call iterates over it by closing all wakeup fd. This is safe
1739 * because at this point we are sure that the metadata producer is
1740 * either dead or blocked.
1741 */
1742 lttng_ustconsumer_close_metadata(metadata_ht);
1743 break;
1744 default:
1745 ERR("Unknown consumer_data type");
1746 assert(0);
1747 }
1748}
1749
fb3a43a9
DG
1750/*
1751 * Clean up a metadata stream and free its memory.
1752 */
e316aad5
DG
1753void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1754 struct lttng_ht *ht)
fb3a43a9
DG
1755{
1756 int ret;
e316aad5
DG
1757 struct lttng_ht_iter iter;
1758 struct lttng_consumer_channel *free_chan = NULL;
fb3a43a9
DG
1759 struct consumer_relayd_sock_pair *relayd;
1760
1761 assert(stream);
1762 /*
1763 * This call should NEVER receive regular stream. It must always be
1764 * metadata stream and this is crucial for data structure synchronization.
1765 */
1766 assert(stream->metadata_flag);
1767
e316aad5
DG
1768 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1769
1770 if (ht == NULL) {
1771 /* Means the stream was allocated but not successfully added */
ffe60014 1772 goto free_stream_rcu;
e316aad5
DG
1773 }
1774
74251bb8 1775 pthread_mutex_lock(&consumer_data.lock);
8994307f
DG
1776 pthread_mutex_lock(&stream->lock);
1777
fb3a43a9
DG
1778 switch (consumer_data.type) {
1779 case LTTNG_CONSUMER_KERNEL:
1780 if (stream->mmap_base != NULL) {
1781 ret = munmap(stream->mmap_base, stream->mmap_len);
1782 if (ret != 0) {
1783 PERROR("munmap metadata stream");
1784 }
1785 }
4c95e622
JD
1786
1787 if (stream->wait_fd >= 0) {
1788 ret = close(stream->wait_fd);
1789 if (ret < 0) {
1790 PERROR("close kernel metadata wait_fd");
1791 }
1792 }
fb3a43a9
DG
1793 break;
1794 case LTTNG_CONSUMER32_UST:
1795 case LTTNG_CONSUMER64_UST:
1796 lttng_ustconsumer_del_stream(stream);
1797 break;
1798 default:
1799 ERR("Unknown consumer_data type");
1800 assert(0);
e316aad5 1801 goto end;
fb3a43a9 1802 }
fb3a43a9 1803
c869f647 1804 rcu_read_lock();
58b1f425 1805 iter.iter.node = &stream->node.node;
c869f647
DG
1806 ret = lttng_ht_del(ht, &iter);
1807 assert(!ret);
ca22feea 1808
d8ef542d
MD
1809 iter.iter.node = &stream->node_channel_id.node;
1810 ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
1811 assert(!ret);
1812
ca22feea
DG
1813 iter.iter.node = &stream->node_session_id.node;
1814 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1815 assert(!ret);
c869f647
DG
1816 rcu_read_unlock();
1817
fb3a43a9
DG
1818 if (stream->out_fd >= 0) {
1819 ret = close(stream->out_fd);
1820 if (ret) {
1821 PERROR("close");
1822 }
1823 }
1824
fb3a43a9
DG
1825 /* Check and cleanup relayd */
1826 rcu_read_lock();
1827 relayd = consumer_find_relayd(stream->net_seq_idx);
1828 if (relayd != NULL) {
1829 uatomic_dec(&relayd->refcount);
1830 assert(uatomic_read(&relayd->refcount) >= 0);
1831
1832 /* Closing streams requires to lock the control socket. */
1833 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1834 ret = relayd_send_close_stream(&relayd->control_sock,
1835 stream->relayd_stream_id, stream->next_net_seq_num - 1);
1836 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1837 if (ret < 0) {
1838 DBG("Unable to close stream on the relayd. Continuing");
1839 /*
1840 * Continue here. There is nothing we can do for the relayd.
1841 * Chances are that the relayd has closed the socket so we just
1842 * continue cleaning up.
1843 */
1844 }
1845
1846 /* Both conditions are met, we destroy the relayd. */
1847 if (uatomic_read(&relayd->refcount) == 0 &&
1848 uatomic_read(&relayd->destroy_flag)) {
51230d70 1849 consumer_destroy_relayd(relayd);
fb3a43a9
DG
1850 }
1851 }
1852 rcu_read_unlock();
1853
1854 /* Atomically decrement channel refcount since other threads can use it. */
f2ad556d 1855 if (!uatomic_sub_return(&stream->chan->refcount, 1)
ffe60014 1856 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
c30aaa51 1857 /* Go for channel deletion! */
e316aad5 1858 free_chan = stream->chan;
fb3a43a9
DG
1859 }
1860
e316aad5 1861end:
73811ecc
DG
1862 /*
1863 * Nullify the stream reference so it is not used after deletion. The
1864 * consumer data lock MUST be acquired before being able to check for a
1865 * NULL pointer value.
1866 */
1867 stream->chan->metadata_stream = NULL;
1868
8994307f 1869 pthread_mutex_unlock(&stream->lock);
74251bb8 1870 pthread_mutex_unlock(&consumer_data.lock);
e316aad5
DG
1871
1872 if (free_chan) {
1873 consumer_del_channel(free_chan);
1874 }
1875
ffe60014
DG
1876free_stream_rcu:
1877 call_rcu(&stream->node.head, free_stream_rcu);
fb3a43a9
DG
1878}
1879
1880/*
1881 * Action done with the metadata stream when adding it to the consumer internal
1882 * data structures to handle it.
1883 */
ffe60014 1884static int add_metadata_stream(struct lttng_consumer_stream *stream,
e316aad5 1885 struct lttng_ht *ht)
fb3a43a9 1886{
e316aad5 1887 int ret = 0;
fb3a43a9 1888 struct consumer_relayd_sock_pair *relayd;
76082088 1889 struct lttng_ht_iter iter;
d88aee68 1890 struct lttng_ht_node_u64 *node;
fb3a43a9 1891
e316aad5
DG
1892 assert(stream);
1893 assert(ht);
1894
d88aee68 1895 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5
DG
1896
1897 pthread_mutex_lock(&consumer_data.lock);
2e818a6a 1898 pthread_mutex_lock(&stream->lock);
e316aad5 1899
e316aad5
DG
1900 /*
1901 * From here, refcounts are updated so be _careful_ when returning an error
1902 * after this point.
1903 */
1904
fb3a43a9 1905 rcu_read_lock();
76082088
DG
1906
1907 /*
1908 * Lookup the stream just to make sure it does not exist in our internal
1909 * state. This should NEVER happen.
1910 */
d88aee68
DG
1911 lttng_ht_lookup(ht, &stream->key, &iter);
1912 node = lttng_ht_iter_get_node_u64(&iter);
76082088
DG
1913 assert(!node);
1914
e316aad5 1915 /* Find relayd and, if one is found, increment refcount. */
fb3a43a9
DG
1916 relayd = consumer_find_relayd(stream->net_seq_idx);
1917 if (relayd != NULL) {
1918 uatomic_inc(&relayd->refcount);
1919 }
e316aad5 1920
e316aad5 1921 /*
ffe60014
DG
1922 * When nb_init_stream_left reaches 0, we don't need to trigger any action
1923 * in terms of destroying the associated channel, because the action that
e316aad5
DG
1924 * causes the count to become 0 also causes a stream to be added. The
1925 * channel deletion will thus be triggered by the following removal of this
1926 * stream.
1927 */
ffe60014 1928 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
1929 /* Increment refcount before decrementing nb_init_stream_left */
1930 cmm_smp_wmb();
ffe60014 1931 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
1932 }
1933
d88aee68 1934 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 1935
d8ef542d
MD
1936 lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
1937 &stream->node_channel_id);
1938
ca22feea
DG
1939 /*
1940 * Add stream to the stream_list_ht of the consumer data. No need to steal
1941 * the key since the HT does not use it and we allow to add redundant keys
1942 * into this table.
1943 */
d88aee68 1944 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 1945
fb3a43a9 1946 rcu_read_unlock();
e316aad5 1947
2e818a6a 1948 pthread_mutex_unlock(&stream->lock);
e316aad5
DG
1949 pthread_mutex_unlock(&consumer_data.lock);
1950 return ret;
fb3a43a9
DG
1951}
1952
8994307f
DG
1953/*
1954 * Delete data stream that are flagged for deletion (endpoint_status).
1955 */
1956static void validate_endpoint_status_data_stream(void)
1957{
1958 struct lttng_ht_iter iter;
1959 struct lttng_consumer_stream *stream;
1960
1961 DBG("Consumer delete flagged data stream");
1962
1963 rcu_read_lock();
1964 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
1965 /* Validate delete flag of the stream */
79d4ffb7 1966 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
1967 continue;
1968 }
1969 /* Delete it right now */
1970 consumer_del_stream(stream, data_ht);
1971 }
1972 rcu_read_unlock();
1973}
1974
1975/*
1976 * Delete metadata stream that are flagged for deletion (endpoint_status).
1977 */
1978static void validate_endpoint_status_metadata_stream(
1979 struct lttng_poll_event *pollset)
1980{
1981 struct lttng_ht_iter iter;
1982 struct lttng_consumer_stream *stream;
1983
1984 DBG("Consumer delete flagged metadata stream");
1985
1986 assert(pollset);
1987
1988 rcu_read_lock();
1989 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
1990 /* Validate delete flag of the stream */
79d4ffb7 1991 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
1992 continue;
1993 }
1994 /*
1995 * Remove from pollset so the metadata thread can continue without
1996 * blocking on a deleted stream.
1997 */
1998 lttng_poll_del(pollset, stream->wait_fd);
1999
2000 /* Delete it right now */
2001 consumer_del_metadata_stream(stream, metadata_ht);
2002 }
2003 rcu_read_unlock();
2004}
2005
fb3a43a9
DG
2006/*
2007 * Thread polls on metadata file descriptor and write them on disk or on the
2008 * network.
2009 */
7d980def 2010void *consumer_thread_metadata_poll(void *data)
fb3a43a9
DG
2011{
2012 int ret, i, pollfd;
2013 uint32_t revents, nb_fd;
e316aad5 2014 struct lttng_consumer_stream *stream = NULL;
fb3a43a9 2015 struct lttng_ht_iter iter;
d88aee68 2016 struct lttng_ht_node_u64 *node;
fb3a43a9
DG
2017 struct lttng_poll_event events;
2018 struct lttng_consumer_local_data *ctx = data;
2019 ssize_t len;
2020
2021 rcu_register_thread();
2022
d88aee68 2023 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
04bb2b64
DG
2024 if (!metadata_ht) {
2025 /* ENOMEM at this point. Better to bail out. */
d8ef542d 2026 goto end_ht;
04bb2b64
DG
2027 }
2028
fb3a43a9
DG
2029 DBG("Thread metadata poll started");
2030
fb3a43a9
DG
2031 /* Size is set to 1 for the consumer_metadata pipe */
2032 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2033 if (ret < 0) {
2034 ERR("Poll set creation failed");
d8ef542d 2035 goto end_poll;
fb3a43a9
DG
2036 }
2037
13886d2d
DG
2038 ret = lttng_poll_add(&events,
2039 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2040 if (ret < 0) {
2041 goto end;
2042 }
2043
2044 /* Main loop */
2045 DBG("Metadata main loop started");
2046
2047 while (1) {
fb3a43a9 2048 /* Only the metadata pipe is set */
d21b0d71 2049 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
fb3a43a9
DG
2050 goto end;
2051 }
2052
2053restart:
d21b0d71 2054 DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
fb3a43a9
DG
2055 ret = lttng_poll_wait(&events, -1);
2056 DBG("Metadata event catched in thread");
2057 if (ret < 0) {
2058 if (errno == EINTR) {
e316aad5 2059 ERR("Poll EINTR catched");
fb3a43a9
DG
2060 goto restart;
2061 }
2062 goto error;
2063 }
2064
0d9c5d77
DG
2065 nb_fd = ret;
2066
e316aad5 2067 /* From here, the event is a metadata wait fd */
fb3a43a9
DG
2068 for (i = 0; i < nb_fd; i++) {
2069 revents = LTTNG_POLL_GETEV(&events, i);
2070 pollfd = LTTNG_POLL_GETFD(&events, i);
2071
e316aad5
DG
2072 /* Just don't waste time if no returned events for the fd */
2073 if (!revents) {
2074 continue;
2075 }
2076
13886d2d 2077 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
4adabd61 2078 if (revents & (LPOLLERR | LPOLLHUP )) {
fb3a43a9
DG
2079 DBG("Metadata thread pipe hung up");
2080 /*
2081 * Remove the pipe from the poll set and continue the loop
2082 * since their might be data to consume.
2083 */
13886d2d
DG
2084 lttng_poll_del(&events,
2085 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2086 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2087 continue;
2088 } else if (revents & LPOLLIN) {
13886d2d
DG
2089 ssize_t pipe_len;
2090
2091 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2092 &stream, sizeof(stream));
2093 if (pipe_len < 0) {
2094 ERR("read metadata stream, ret: %ld", pipe_len);
fb3a43a9 2095 /*
13886d2d 2096 * Continue here to handle the rest of the streams.
fb3a43a9
DG
2097 */
2098 continue;
2099 }
2100
8994307f
DG
2101 /* A NULL stream means that the state has changed. */
2102 if (stream == NULL) {
2103 /* Check for deleted streams. */
2104 validate_endpoint_status_metadata_stream(&events);
3714380f 2105 goto restart;
8994307f
DG
2106 }
2107
fb3a43a9
DG
2108 DBG("Adding metadata stream %d to poll set",
2109 stream->wait_fd);
2110
ffe60014 2111 ret = add_metadata_stream(stream, metadata_ht);
e316aad5
DG
2112 if (ret) {
2113 ERR("Unable to add metadata stream");
2114 /* Stream was not setup properly. Continuing. */
2115 consumer_del_metadata_stream(stream, NULL);
2116 continue;
2117 }
fb3a43a9
DG
2118
2119 /* Add metadata stream to the global poll events list */
2120 lttng_poll_add(&events, stream->wait_fd,
2121 LPOLLIN | LPOLLPRI);
fb3a43a9
DG
2122 }
2123
e316aad5 2124 /* Handle other stream */
fb3a43a9
DG
2125 continue;
2126 }
2127
d09e1200 2128 rcu_read_lock();
d88aee68
DG
2129 {
2130 uint64_t tmp_id = (uint64_t) pollfd;
2131
2132 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2133 }
2134 node = lttng_ht_iter_get_node_u64(&iter);
e316aad5 2135 assert(node);
fb3a43a9
DG
2136
2137 stream = caa_container_of(node, struct lttng_consumer_stream,
58b1f425 2138 node);
fb3a43a9 2139
e316aad5 2140 /* Check for error event */
4adabd61 2141 if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2142 DBG("Metadata fd %d is hup|err.", pollfd);
fb3a43a9
DG
2143 if (!stream->hangup_flush_done
2144 && (consumer_data.type == LTTNG_CONSUMER32_UST
2145 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2146 DBG("Attempting to flush and consume the UST buffers");
2147 lttng_ustconsumer_on_stream_hangup(stream);
2148
2149 /* We just flushed the stream now read it. */
4bb94b75
DG
2150 do {
2151 len = ctx->on_buffer_ready(stream, ctx);
2152 /*
2153 * We don't check the return value here since if we get
2154 * a negative len, it means an error occured thus we
2155 * simply remove it from the poll set and free the
2156 * stream.
2157 */
2158 } while (len > 0);
fb3a43a9
DG
2159 }
2160
fb3a43a9 2161 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2162 /*
2163 * This call update the channel states, closes file descriptors
2164 * and securely free the stream.
2165 */
2166 consumer_del_metadata_stream(stream, metadata_ht);
2167 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2168 /* Get the data out of the metadata file descriptor */
2169 DBG("Metadata available on fd %d", pollfd);
2170 assert(stream->wait_fd == pollfd);
2171
2172 len = ctx->on_buffer_ready(stream, ctx);
2173 /* It's ok to have an unavailable sub-buffer */
b64403e3 2174 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2175 /* Clean up stream from consumer and free it. */
2176 lttng_poll_del(&events, stream->wait_fd);
2177 consumer_del_metadata_stream(stream, metadata_ht);
e316aad5
DG
2178 } else if (len > 0) {
2179 stream->data_read = 1;
2180 }
fb3a43a9 2181 }
e316aad5
DG
2182
2183 /* Release RCU lock for the stream looked up */
d09e1200 2184 rcu_read_unlock();
fb3a43a9
DG
2185 }
2186 }
2187
2188error:
2189end:
2190 DBG("Metadata poll thread exiting");
fb3a43a9 2191
d8ef542d
MD
2192 lttng_poll_clean(&events);
2193end_poll:
04bb2b64 2194 destroy_stream_ht(metadata_ht);
d8ef542d 2195end_ht:
fb3a43a9
DG
2196 rcu_unregister_thread();
2197 return NULL;
2198}
2199
3bd1e081 2200/*
e4421fec 2201 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2202 * it to tracefile if necessary.
2203 */
7d980def 2204void *consumer_thread_data_poll(void *data)
3bd1e081
MD
2205{
2206 int num_rdy, num_hup, high_prio, ret, i;
2207 struct pollfd *pollfd = NULL;
2208 /* local view of the streams */
c869f647 2209 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
3bd1e081
MD
2210 /* local view of consumer_data.fds_count */
2211 int nb_fd = 0;
3bd1e081 2212 struct lttng_consumer_local_data *ctx = data;
00e2e675 2213 ssize_t len;
3bd1e081 2214
e7b994a3
DG
2215 rcu_register_thread();
2216
d88aee68 2217 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
43c34bc3 2218 if (data_ht == NULL) {
04bb2b64 2219 /* ENOMEM at this point. Better to bail out. */
43c34bc3
DG
2220 goto end;
2221 }
2222
effcf122 2223 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
2224
2225 while (1) {
2226 high_prio = 0;
2227 num_hup = 0;
2228
2229 /*
e4421fec 2230 * the fds set has been updated, we need to update our
3bd1e081
MD
2231 * local array as well
2232 */
2233 pthread_mutex_lock(&consumer_data.lock);
2234 if (consumer_data.need_update) {
0e428499
DG
2235 free(pollfd);
2236 pollfd = NULL;
2237
2238 free(local_stream);
2239 local_stream = NULL;
3bd1e081 2240
50f8ae69 2241 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2242 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081 2243 if (pollfd == NULL) {
7a57cf92 2244 PERROR("pollfd malloc");
3bd1e081
MD
2245 pthread_mutex_unlock(&consumer_data.lock);
2246 goto end;
2247 }
2248
50f8ae69 2249 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2250 local_stream = zmalloc((consumer_data.stream_count + 1) *
747f8642 2251 sizeof(struct lttng_consumer_stream *));
3bd1e081 2252 if (local_stream == NULL) {
7a57cf92 2253 PERROR("local_stream malloc");
3bd1e081
MD
2254 pthread_mutex_unlock(&consumer_data.lock);
2255 goto end;
2256 }
ffe60014 2257 ret = update_poll_array(ctx, &pollfd, local_stream,
43c34bc3 2258 data_ht);
3bd1e081
MD
2259 if (ret < 0) {
2260 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2261 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2262 pthread_mutex_unlock(&consumer_data.lock);
2263 goto end;
2264 }
2265 nb_fd = ret;
2266 consumer_data.need_update = 0;
2267 }
2268 pthread_mutex_unlock(&consumer_data.lock);
2269
4078b776
MD
2270 /* No FDs and consumer_quit, consumer_cleanup the thread */
2271 if (nb_fd == 0 && consumer_quit == 1) {
2272 goto end;
2273 }
3bd1e081 2274 /* poll on the array of fds */
88f2b785 2275 restart:
3bd1e081 2276 DBG("polling on %d fd", nb_fd + 1);
cb365c03 2277 num_rdy = poll(pollfd, nb_fd + 1, -1);
3bd1e081
MD
2278 DBG("poll num_rdy : %d", num_rdy);
2279 if (num_rdy == -1) {
88f2b785
MD
2280 /*
2281 * Restart interrupted system call.
2282 */
2283 if (errno == EINTR) {
2284 goto restart;
2285 }
7a57cf92 2286 PERROR("Poll error");
f73fabfd 2287 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2288 goto end;
2289 } else if (num_rdy == 0) {
2290 DBG("Polling thread timed out");
2291 goto end;
2292 }
2293
3bd1e081 2294 /*
50f8ae69 2295 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2296 * beginning of the loop to update the array. We want to prioritize
2297 * array update over low-priority reads.
3bd1e081 2298 */
509bb1cf 2299 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2300 ssize_t pipe_readlen;
04fdd819 2301
50f8ae69 2302 DBG("consumer_data_pipe wake up");
acdb9057
DG
2303 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2304 &new_stream, sizeof(new_stream));
23f5f35d 2305 if (pipe_readlen < 0) {
acdb9057 2306 ERR("Consumer data pipe ret %ld", pipe_readlen);
23f5f35d
DG
2307 /* Continue so we can at least handle the current stream(s). */
2308 continue;
2309 }
c869f647
DG
2310
2311 /*
2312 * If the stream is NULL, just ignore it. It's also possible that
2313 * the sessiond poll thread changed the consumer_quit state and is
2314 * waking us up to test it.
2315 */
2316 if (new_stream == NULL) {
8994307f 2317 validate_endpoint_status_data_stream();
c869f647
DG
2318 continue;
2319 }
2320
ffe60014 2321 ret = add_stream(new_stream, data_ht);
c869f647 2322 if (ret) {
d88aee68 2323 ERR("Consumer add stream %" PRIu64 " failed. Continuing",
c869f647
DG
2324 new_stream->key);
2325 /*
2326 * At this point, if the add_stream fails, it is not in the
2327 * hash table thus passing the NULL value here.
2328 */
2329 consumer_del_stream(new_stream, NULL);
2330 }
2331
2332 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2333 continue;
2334 }
2335
2336 /* Take care of high priority channels first. */
2337 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2338 if (local_stream[i] == NULL) {
2339 continue;
2340 }
fb3a43a9 2341 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2342 DBG("Urgent read on fd %d", pollfd[i].fd);
2343 high_prio = 1;
4078b776 2344 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 2345 /* it's ok to have an unavailable sub-buffer */
b64403e3 2346 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2347 /* Clean the stream and free it. */
2348 consumer_del_stream(local_stream[i], data_ht);
9617607b 2349 local_stream[i] = NULL;
4078b776
MD
2350 } else if (len > 0) {
2351 local_stream[i]->data_read = 1;
d41f73b7 2352 }
3bd1e081
MD
2353 }
2354 }
2355
4078b776
MD
2356 /*
2357 * If we read high prio channel in this loop, try again
2358 * for more high prio data.
2359 */
2360 if (high_prio) {
3bd1e081
MD
2361 continue;
2362 }
2363
2364 /* Take care of low priority channels. */
4078b776 2365 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2366 if (local_stream[i] == NULL) {
2367 continue;
2368 }
4078b776
MD
2369 if ((pollfd[i].revents & POLLIN) ||
2370 local_stream[i]->hangup_flush_done) {
4078b776
MD
2371 DBG("Normal read on fd %d", pollfd[i].fd);
2372 len = ctx->on_buffer_ready(local_stream[i], ctx);
2373 /* it's ok to have an unavailable sub-buffer */
b64403e3 2374 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2375 /* Clean the stream and free it. */
2376 consumer_del_stream(local_stream[i], data_ht);
9617607b 2377 local_stream[i] = NULL;
4078b776
MD
2378 } else if (len > 0) {
2379 local_stream[i]->data_read = 1;
2380 }
2381 }
2382 }
2383
2384 /* Handle hangup and errors */
2385 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2386 if (local_stream[i] == NULL) {
2387 continue;
2388 }
4078b776
MD
2389 if (!local_stream[i]->hangup_flush_done
2390 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2391 && (consumer_data.type == LTTNG_CONSUMER32_UST
2392 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2393 DBG("fd %d is hup|err|nval. Attempting flush and read.",
9617607b 2394 pollfd[i].fd);
4078b776
MD
2395 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2396 /* Attempt read again, for the data we just flushed. */
2397 local_stream[i]->data_read = 1;
2398 }
2399 /*
2400 * If the poll flag is HUP/ERR/NVAL and we have
2401 * read no data in this pass, we can remove the
2402 * stream from its hash table.
2403 */
2404 if ((pollfd[i].revents & POLLHUP)) {
2405 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2406 if (!local_stream[i]->data_read) {
43c34bc3 2407 consumer_del_stream(local_stream[i], data_ht);
9617607b 2408 local_stream[i] = NULL;
4078b776
MD
2409 num_hup++;
2410 }
2411 } else if (pollfd[i].revents & POLLERR) {
2412 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2413 if (!local_stream[i]->data_read) {
43c34bc3 2414 consumer_del_stream(local_stream[i], data_ht);
9617607b 2415 local_stream[i] = NULL;
4078b776
MD
2416 num_hup++;
2417 }
2418 } else if (pollfd[i].revents & POLLNVAL) {
2419 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2420 if (!local_stream[i]->data_read) {
43c34bc3 2421 consumer_del_stream(local_stream[i], data_ht);
9617607b 2422 local_stream[i] = NULL;
4078b776 2423 num_hup++;
3bd1e081
MD
2424 }
2425 }
9617607b
DG
2426 if (local_stream[i] != NULL) {
2427 local_stream[i]->data_read = 0;
2428 }
3bd1e081
MD
2429 }
2430 }
2431end:
2432 DBG("polling thread exiting");
0e428499
DG
2433 free(pollfd);
2434 free(local_stream);
fb3a43a9
DG
2435
2436 /*
2437 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2438 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2439 * read side of the pipe. If we close them both, epoll_wait strangely does
2440 * not return and could create a endless wait period if the pipe is the
2441 * only tracked fd in the poll set. The thread will take care of closing
2442 * the read side.
fb3a43a9 2443 */
13886d2d 2444 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2445
04bb2b64 2446 destroy_data_stream_ht(data_ht);
43c34bc3 2447
e7b994a3 2448 rcu_unregister_thread();
3bd1e081
MD
2449 return NULL;
2450}
2451
d8ef542d
MD
2452/*
2453 * Close wake-up end of each stream belonging to the channel. This will
2454 * allow the poll() on the stream read-side to detect when the
2455 * write-side (application) finally closes them.
2456 */
2457static
2458void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2459{
2460 struct lttng_ht *ht;
2461 struct lttng_consumer_stream *stream;
2462 struct lttng_ht_iter iter;
2463
2464 ht = consumer_data.stream_per_chan_id_ht;
2465
2466 rcu_read_lock();
2467 cds_lfht_for_each_entry_duplicate(ht->ht,
2468 ht->hash_fct(&channel->key, lttng_ht_seed),
2469 ht->match_fct, &channel->key,
2470 &iter.iter, stream, node_channel_id.node) {
f2ad556d
MD
2471 /*
2472 * Protect against teardown with mutex.
2473 */
2474 pthread_mutex_lock(&stream->lock);
2475 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2476 goto next;
2477 }
d8ef542d
MD
2478 switch (consumer_data.type) {
2479 case LTTNG_CONSUMER_KERNEL:
2480 break;
2481 case LTTNG_CONSUMER32_UST:
2482 case LTTNG_CONSUMER64_UST:
2483 /*
2484 * Note: a mutex is taken internally within
2485 * liblttng-ust-ctl to protect timer wakeup_fd
2486 * use from concurrent close.
2487 */
2488 lttng_ustconsumer_close_stream_wakeup(stream);
2489 break;
2490 default:
2491 ERR("Unknown consumer_data type");
2492 assert(0);
2493 }
f2ad556d
MD
2494 next:
2495 pthread_mutex_unlock(&stream->lock);
d8ef542d
MD
2496 }
2497 rcu_read_unlock();
2498}
2499
2500static void destroy_channel_ht(struct lttng_ht *ht)
2501{
2502 struct lttng_ht_iter iter;
2503 struct lttng_consumer_channel *channel;
2504 int ret;
2505
2506 if (ht == NULL) {
2507 return;
2508 }
2509
2510 rcu_read_lock();
2511 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2512 ret = lttng_ht_del(ht, &iter);
2513 assert(ret != 0);
2514 }
2515 rcu_read_unlock();
2516
2517 lttng_ht_destroy(ht);
2518}
2519
2520/*
2521 * This thread polls the channel fds to detect when they are being
2522 * closed. It closes all related streams if the channel is detected as
2523 * closed. It is currently only used as a shim layer for UST because the
2524 * consumerd needs to keep the per-stream wakeup end of pipes open for
2525 * periodical flush.
2526 */
2527void *consumer_thread_channel_poll(void *data)
2528{
2529 int ret, i, pollfd;
2530 uint32_t revents, nb_fd;
2531 struct lttng_consumer_channel *chan = NULL;
2532 struct lttng_ht_iter iter;
2533 struct lttng_ht_node_u64 *node;
2534 struct lttng_poll_event events;
2535 struct lttng_consumer_local_data *ctx = data;
2536 struct lttng_ht *channel_ht;
2537
2538 rcu_register_thread();
2539
2540 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2541 if (!channel_ht) {
2542 /* ENOMEM at this point. Better to bail out. */
2543 goto end_ht;
2544 }
2545
2546 DBG("Thread channel poll started");
2547
2548 /* Size is set to 1 for the consumer_channel pipe */
2549 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2550 if (ret < 0) {
2551 ERR("Poll set creation failed");
2552 goto end_poll;
2553 }
2554
2555 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2556 if (ret < 0) {
2557 goto end;
2558 }
2559
2560 /* Main loop */
2561 DBG("Channel main loop started");
2562
2563 while (1) {
2564 /* Only the channel pipe is set */
2565 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
2566 goto end;
2567 }
2568
2569restart:
2570 DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2571 ret = lttng_poll_wait(&events, -1);
2572 DBG("Channel event catched in thread");
2573 if (ret < 0) {
2574 if (errno == EINTR) {
2575 ERR("Poll EINTR catched");
2576 goto restart;
2577 }
2578 goto end;
2579 }
2580
2581 nb_fd = ret;
2582
2583 /* From here, the event is a channel wait fd */
2584 for (i = 0; i < nb_fd; i++) {
2585 revents = LTTNG_POLL_GETEV(&events, i);
2586 pollfd = LTTNG_POLL_GETFD(&events, i);
2587
2588 /* Just don't waste time if no returned events for the fd */
2589 if (!revents) {
2590 continue;
2591 }
2592 if (pollfd == ctx->consumer_channel_pipe[0]) {
2593 if (revents & (LPOLLERR | LPOLLHUP)) {
2594 DBG("Channel thread pipe hung up");
2595 /*
2596 * Remove the pipe from the poll set and continue the loop
2597 * since their might be data to consume.
2598 */
2599 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2600 continue;
2601 } else if (revents & LPOLLIN) {
2602 enum consumer_channel_action action;
a0cbdd2e 2603 uint64_t key;
d8ef542d 2604
a0cbdd2e 2605 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d
MD
2606 if (ret <= 0) {
2607 ERR("Error reading channel pipe");
2608 continue;
2609 }
2610
2611 switch (action) {
2612 case CONSUMER_CHANNEL_ADD:
2613 DBG("Adding channel %d to poll set",
2614 chan->wait_fd);
2615
2616 lttng_ht_node_init_u64(&chan->wait_fd_node,
2617 chan->wait_fd);
c7260a81 2618 rcu_read_lock();
d8ef542d
MD
2619 lttng_ht_add_unique_u64(channel_ht,
2620 &chan->wait_fd_node);
c7260a81 2621 rcu_read_unlock();
d8ef542d
MD
2622 /* Add channel to the global poll events list */
2623 lttng_poll_add(&events, chan->wait_fd,
2624 LPOLLIN | LPOLLPRI);
2625 break;
a0cbdd2e
MD
2626 case CONSUMER_CHANNEL_DEL:
2627 {
f2a444f1
DG
2628 struct lttng_consumer_stream *stream, *stmp;
2629
c7260a81 2630 rcu_read_lock();
a0cbdd2e
MD
2631 chan = consumer_find_channel(key);
2632 if (!chan) {
c7260a81 2633 rcu_read_unlock();
a0cbdd2e
MD
2634 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
2635 break;
2636 }
2637 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 2638 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e
MD
2639 ret = lttng_ht_del(channel_ht, &iter);
2640 assert(ret == 0);
2641 consumer_close_channel_streams(chan);
2642
f2a444f1
DG
2643 switch (consumer_data.type) {
2644 case LTTNG_CONSUMER_KERNEL:
2645 break;
2646 case LTTNG_CONSUMER32_UST:
2647 case LTTNG_CONSUMER64_UST:
2648 /* Delete streams that might have been left in the stream list. */
2649 cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
2650 send_node) {
2651 cds_list_del(&stream->send_node);
2652 lttng_ustconsumer_del_stream(stream);
2653 uatomic_sub(&stream->chan->refcount, 1);
2654 assert(&chan->refcount);
2655 free(stream);
2656 }
2657 break;
2658 default:
2659 ERR("Unknown consumer_data type");
2660 assert(0);
2661 }
2662
a0cbdd2e
MD
2663 /*
2664 * Release our own refcount. Force channel deletion even if
2665 * streams were not initialized.
2666 */
2667 if (!uatomic_sub_return(&chan->refcount, 1)) {
2668 consumer_del_channel(chan);
2669 }
c7260a81 2670 rcu_read_unlock();
a0cbdd2e
MD
2671 goto restart;
2672 }
d8ef542d
MD
2673 case CONSUMER_CHANNEL_QUIT:
2674 /*
2675 * Remove the pipe from the poll set and continue the loop
2676 * since their might be data to consume.
2677 */
2678 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2679 continue;
2680 default:
2681 ERR("Unknown action");
2682 break;
2683 }
2684 }
2685
2686 /* Handle other stream */
2687 continue;
2688 }
2689
2690 rcu_read_lock();
2691 {
2692 uint64_t tmp_id = (uint64_t) pollfd;
2693
2694 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
2695 }
2696 node = lttng_ht_iter_get_node_u64(&iter);
2697 assert(node);
2698
2699 chan = caa_container_of(node, struct lttng_consumer_channel,
2700 wait_fd_node);
2701
2702 /* Check for error event */
2703 if (revents & (LPOLLERR | LPOLLHUP)) {
2704 DBG("Channel fd %d is hup|err.", pollfd);
2705
2706 lttng_poll_del(&events, chan->wait_fd);
2707 ret = lttng_ht_del(channel_ht, &iter);
2708 assert(ret == 0);
f2a444f1 2709 assert(cds_list_empty(&chan->streams.head));
d8ef542d 2710 consumer_close_channel_streams(chan);
f2ad556d
MD
2711
2712 /* Release our own refcount */
2713 if (!uatomic_sub_return(&chan->refcount, 1)
2714 && !uatomic_read(&chan->nb_init_stream_left)) {
2715 consumer_del_channel(chan);
2716 }
d8ef542d
MD
2717 }
2718
2719 /* Release RCU lock for the channel looked up */
2720 rcu_read_unlock();
2721 }
2722 }
2723
2724end:
2725 lttng_poll_clean(&events);
2726end_poll:
2727 destroy_channel_ht(channel_ht);
2728end_ht:
2729 DBG("Channel poll thread exiting");
2730 rcu_unregister_thread();
2731 return NULL;
2732}
2733
331744e3
JD
2734static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
2735 struct pollfd *sockpoll, int client_socket)
2736{
2737 int ret;
2738
2739 assert(ctx);
2740 assert(sockpoll);
2741
2742 if (lttng_consumer_poll_socket(sockpoll) < 0) {
2743 ret = -1;
2744 goto error;
2745 }
2746 DBG("Metadata connection on client_socket");
2747
2748 /* Blocking call, waiting for transmission */
2749 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
2750 if (ctx->consumer_metadata_socket < 0) {
2751 WARN("On accept metadata");
2752 ret = -1;
2753 goto error;
2754 }
2755 ret = 0;
2756
2757error:
2758 return ret;
2759}
2760
3bd1e081
MD
2761/*
2762 * This thread listens on the consumerd socket and receives the file
2763 * descriptors from the session daemon.
2764 */
7d980def 2765void *consumer_thread_sessiond_poll(void *data)
3bd1e081 2766{
d96f09c6 2767 int sock = -1, client_socket, ret;
3bd1e081
MD
2768 /*
2769 * structure to poll for incoming data on communication socket avoids
2770 * making blocking sockets.
2771 */
2772 struct pollfd consumer_sockpoll[2];
2773 struct lttng_consumer_local_data *ctx = data;
2774
e7b994a3
DG
2775 rcu_register_thread();
2776
3bd1e081
MD
2777 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2778 unlink(ctx->consumer_command_sock_path);
2779 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2780 if (client_socket < 0) {
2781 ERR("Cannot create command socket");
2782 goto end;
2783 }
2784
2785 ret = lttcomm_listen_unix_sock(client_socket);
2786 if (ret < 0) {
2787 goto end;
2788 }
2789
32258573 2790 DBG("Sending ready command to lttng-sessiond");
f73fabfd 2791 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
2792 /* return < 0 on error, but == 0 is not fatal */
2793 if (ret < 0) {
32258573 2794 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
2795 goto end;
2796 }
2797
3bd1e081
MD
2798 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2799 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2800 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2801 consumer_sockpoll[1].fd = client_socket;
2802 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2803
2804 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2805 goto end;
2806 }
2807 DBG("Connection on client_socket");
2808
2809 /* Blocking call, waiting for transmission */
2810 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 2811 if (sock < 0) {
3bd1e081
MD
2812 WARN("On accept");
2813 goto end;
2814 }
3bd1e081 2815
331744e3
JD
2816 /*
2817 * Setup metadata socket which is the second socket connection on the
2818 * command unix socket.
2819 */
2820 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
2821 if (ret < 0) {
2822 goto end;
2823 }
2824
d96f09c6
DG
2825 /* This socket is not useful anymore. */
2826 ret = close(client_socket);
2827 if (ret < 0) {
2828 PERROR("close client_socket");
2829 }
2830 client_socket = -1;
2831
3bd1e081
MD
2832 /* update the polling structure to poll on the established socket */
2833 consumer_sockpoll[1].fd = sock;
2834 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2835
2836 while (1) {
2837 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2838 goto end;
2839 }
2840 DBG("Incoming command on sock");
2841 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
2842 if (ret == -ENOENT) {
2843 DBG("Received STOP command");
2844 goto end;
2845 }
4cbc1a04
DG
2846 if (ret <= 0) {
2847 /*
2848 * This could simply be a session daemon quitting. Don't output
2849 * ERR() here.
2850 */
2851 DBG("Communication interrupted on command socket");
3bd1e081
MD
2852 goto end;
2853 }
2854 if (consumer_quit) {
2855 DBG("consumer_thread_receive_fds received quit from signal");
2856 goto end;
2857 }
ffe60014 2858 DBG("received command on sock");
3bd1e081
MD
2859 }
2860end:
ffe60014 2861 DBG("Consumer thread sessiond poll exiting");
3bd1e081 2862
d88aee68
DG
2863 /*
2864 * Close metadata streams since the producer is the session daemon which
2865 * just died.
2866 *
2867 * NOTE: for now, this only applies to the UST tracer.
2868 */
2869 lttng_consumer_close_metadata();
2870
3bd1e081
MD
2871 /*
2872 * when all fds have hung up, the polling thread
2873 * can exit cleanly
2874 */
2875 consumer_quit = 1;
2876
04fdd819 2877 /*
c869f647 2878 * Notify the data poll thread to poll back again and test the
8994307f 2879 * consumer_quit state that we just set so to quit gracefully.
04fdd819 2880 */
acdb9057 2881 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 2882
a0cbdd2e 2883 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 2884
d96f09c6
DG
2885 /* Cleaning up possibly open sockets. */
2886 if (sock >= 0) {
2887 ret = close(sock);
2888 if (ret < 0) {
2889 PERROR("close sock sessiond poll");
2890 }
2891 }
2892 if (client_socket >= 0) {
38476d24 2893 ret = close(client_socket);
d96f09c6
DG
2894 if (ret < 0) {
2895 PERROR("close client_socket sessiond poll");
2896 }
2897 }
2898
e7b994a3 2899 rcu_unregister_thread();
3bd1e081
MD
2900 return NULL;
2901}
d41f73b7 2902
4078b776 2903ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
2904 struct lttng_consumer_local_data *ctx)
2905{
74251bb8
DG
2906 ssize_t ret;
2907
2908 pthread_mutex_lock(&stream->lock);
2909
d41f73b7
MD
2910 switch (consumer_data.type) {
2911 case LTTNG_CONSUMER_KERNEL:
74251bb8
DG
2912 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
2913 break;
7753dea8
MD
2914 case LTTNG_CONSUMER32_UST:
2915 case LTTNG_CONSUMER64_UST:
74251bb8
DG
2916 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
2917 break;
d41f73b7
MD
2918 default:
2919 ERR("Unknown consumer_data type");
2920 assert(0);
74251bb8
DG
2921 ret = -ENOSYS;
2922 break;
d41f73b7 2923 }
74251bb8
DG
2924
2925 pthread_mutex_unlock(&stream->lock);
2926 return ret;
d41f73b7
MD
2927}
2928
2929int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
2930{
2931 switch (consumer_data.type) {
2932 case LTTNG_CONSUMER_KERNEL:
2933 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
2934 case LTTNG_CONSUMER32_UST:
2935 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
2936 return lttng_ustconsumer_on_recv_stream(stream);
2937 default:
2938 ERR("Unknown consumer_data type");
2939 assert(0);
2940 return -ENOSYS;
2941 }
2942}
e4421fec
DG
2943
2944/*
2945 * Allocate and set consumer data hash tables.
2946 */
2947void lttng_consumer_init(void)
2948{
d88aee68
DG
2949 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2950 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2951 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
d8ef542d 2952 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
e4421fec 2953}
7735ef9e
DG
2954
2955/*
2956 * Process the ADD_RELAYD command receive by a consumer.
2957 *
2958 * This will create a relayd socket pair and add it to the relayd hash table.
2959 * The caller MUST acquire a RCU read side lock before calling it.
2960 */
2961int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
2962 struct lttng_consumer_local_data *ctx, int sock,
6151a90f
JD
2963 struct pollfd *consumer_sockpoll,
2964 struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
7735ef9e 2965{
cd2b09ed 2966 int fd = -1, ret = -1, relayd_created = 0;
f50f23d9 2967 enum lttng_error_code ret_code = LTTNG_OK;
d4298c99 2968 struct consumer_relayd_sock_pair *relayd = NULL;
7735ef9e 2969
6151a90f
JD
2970 assert(ctx);
2971 assert(relayd_sock);
2972
7735ef9e
DG
2973 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
2974
2975 /* Get relayd reference if exists. */
2976 relayd = consumer_find_relayd(net_seq_idx);
2977 if (relayd == NULL) {
2978 /* Not found. Allocate one. */
2979 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
2980 if (relayd == NULL) {
0d08d75e
DG
2981 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
2982 ret = -ENOMEM;
2983 } else {
2984 relayd->sessiond_session_id = (uint64_t) sessiond_id;
2985 relayd_created = 1;
7735ef9e 2986 }
0d08d75e
DG
2987
2988 /*
2989 * This code path MUST continue to the consumer send status message to
2990 * we can notify the session daemon and continue our work without
2991 * killing everything.
2992 */
2993 }
2994
2995 /* First send a status message before receiving the fds. */
2996 ret = consumer_send_status_msg(sock, ret_code);
2997 if (ret < 0 || ret_code != LTTNG_OK) {
2998 /* Somehow, the session daemon is not responding anymore. */
2999 goto error;
7735ef9e
DG
3000 }
3001
3002 /* Poll on consumer socket. */
3003 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
0d08d75e 3004 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
7735ef9e
DG
3005 ret = -EINTR;
3006 goto error;
3007 }
3008
3009 /* Get relayd socket from session daemon */
3010 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3011 if (ret != sizeof(fd)) {
0d08d75e 3012 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
7735ef9e 3013 ret = -1;
4028eeb9 3014 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3015
3016 /*
3017 * Failing to receive FDs might indicate a major problem such as
3018 * reaching a fd limit during the receive where the kernel returns a
3019 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3020 * don't take any chances and stop everything.
3021 *
3022 * XXX: Feature request #558 will fix that and avoid this possible
3023 * issue when reaching the fd limit.
3024 */
3025 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3026
3027 /*
3028 * This code path MUST continue to the consumer send status message so
3029 * we can send the error to the thread expecting a reply. The above
3030 * call will make everything stop.
3031 */
7735ef9e
DG
3032 }
3033
f50f23d9
DG
3034 /* We have the fds without error. Send status back. */
3035 ret = consumer_send_status_msg(sock, ret_code);
0d08d75e 3036 if (ret < 0 || ret_code != LTTNG_OK) {
f50f23d9
DG
3037 /* Somehow, the session daemon is not responding anymore. */
3038 goto error;
3039 }
3040
7735ef9e
DG
3041 /* Copy socket information and received FD */
3042 switch (sock_type) {
3043 case LTTNG_STREAM_CONTROL:
3044 /* Copy received lttcomm socket */
6151a90f
JD
3045 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3046 ret = lttcomm_create_sock(&relayd->control_sock.sock);
4028eeb9 3047 /* Immediately try to close the created socket if valid. */
6151a90f
JD
3048 if (relayd->control_sock.sock.fd >= 0) {
3049 if (close(relayd->control_sock.sock.fd)) {
4028eeb9
DG
3050 PERROR("close relayd control socket");
3051 }
7735ef9e 3052 }
4028eeb9 3053 /* Handle create_sock error. */
f66c074c 3054 if (ret < 0) {
4028eeb9 3055 goto error;
f66c074c 3056 }
7735ef9e
DG
3057
3058 /* Assign new file descriptor */
6151a90f
JD
3059 relayd->control_sock.sock.fd = fd;
3060 /* Assign version values. */
3061 relayd->control_sock.major = relayd_sock->major;
3062 relayd->control_sock.minor = relayd_sock->minor;
c5b6f4f0
DG
3063
3064 /*
59e71485
DG
3065 * Create a session on the relayd and store the returned id. Lock the
3066 * control socket mutex if the relayd was NOT created before.
c5b6f4f0 3067 */
59e71485
DG
3068 if (!relayd_created) {
3069 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3070 }
c5b6f4f0 3071 ret = relayd_create_session(&relayd->control_sock,
f7079f67 3072 &relayd->relayd_session_id);
59e71485
DG
3073 if (!relayd_created) {
3074 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3075 }
c5b6f4f0 3076 if (ret < 0) {
ffe60014
DG
3077 /*
3078 * Close all sockets of a relayd object. It will be freed if it was
3079 * created at the error code path or else it will be garbage
3080 * collect.
3081 */
3082 (void) relayd_close(&relayd->control_sock);
3083 (void) relayd_close(&relayd->data_sock);
c5b6f4f0
DG
3084 goto error;
3085 }
3086
7735ef9e
DG
3087 break;
3088 case LTTNG_STREAM_DATA:
3089 /* Copy received lttcomm socket */
6151a90f
JD
3090 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3091 ret = lttcomm_create_sock(&relayd->data_sock.sock);
4028eeb9 3092 /* Immediately try to close the created socket if valid. */
6151a90f
JD
3093 if (relayd->data_sock.sock.fd >= 0) {
3094 if (close(relayd->data_sock.sock.fd)) {
4028eeb9
DG
3095 PERROR("close relayd data socket");
3096 }
7735ef9e 3097 }
4028eeb9 3098 /* Handle create_sock error. */
f66c074c 3099 if (ret < 0) {
4028eeb9 3100 goto error;
f66c074c 3101 }
7735ef9e
DG
3102
3103 /* Assign new file descriptor */
6151a90f
JD
3104 relayd->data_sock.sock.fd = fd;
3105 /* Assign version values. */
3106 relayd->data_sock.major = relayd_sock->major;
3107 relayd->data_sock.minor = relayd_sock->minor;
7735ef9e
DG
3108 break;
3109 default:
3110 ERR("Unknown relayd socket type (%d)", sock_type);
59e71485 3111 ret = -1;
7735ef9e
DG
3112 goto error;
3113 }
3114
d88aee68 3115 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
7735ef9e
DG
3116 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3117 relayd->net_seq_idx, fd);
3118
3119 /*
3120 * Add relayd socket pair to consumer data hashtable. If object already
3121 * exists or on error, the function gracefully returns.
3122 */
d09e1200 3123 add_relayd(relayd);
7735ef9e
DG
3124
3125 /* All good! */
4028eeb9 3126 return 0;
7735ef9e
DG
3127
3128error:
4028eeb9
DG
3129 /* Close received socket if valid. */
3130 if (fd >= 0) {
3131 if (close(fd)) {
3132 PERROR("close received socket");
3133 }
3134 }
cd2b09ed
DG
3135
3136 if (relayd_created) {
cd2b09ed
DG
3137 free(relayd);
3138 }
3139
7735ef9e
DG
3140 return ret;
3141}
ca22feea 3142
4e9a4686
DG
3143/*
3144 * Try to lock the stream mutex.
3145 *
3146 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
3147 */
3148static int stream_try_lock(struct lttng_consumer_stream *stream)
3149{
3150 int ret;
3151
3152 assert(stream);
3153
3154 /*
3155 * Try to lock the stream mutex. On failure, we know that the stream is
3156 * being used else where hence there is data still being extracted.
3157 */
3158 ret = pthread_mutex_trylock(&stream->lock);
3159 if (ret) {
3160 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
3161 ret = 0;
3162 goto end;
3163 }
3164
3165 ret = 1;
3166
3167end:
3168 return ret;
3169}
3170
f7079f67
DG
3171/*
3172 * Search for a relayd associated to the session id and return the reference.
3173 *
3174 * A rcu read side lock MUST be acquire before calling this function and locked
3175 * until the relayd object is no longer necessary.
3176 */
3177static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3178{
3179 struct lttng_ht_iter iter;
f7079f67 3180 struct consumer_relayd_sock_pair *relayd = NULL;
f7079f67
DG
3181
3182 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3183 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3184 node.node) {
18261bd1
DG
3185 /*
3186 * Check by sessiond id which is unique here where the relayd session
3187 * id might not be when having multiple relayd.
3188 */
3189 if (relayd->sessiond_session_id == id) {
f7079f67 3190 /* Found the relayd. There can be only one per id. */
18261bd1 3191 goto found;
f7079f67
DG
3192 }
3193 }
3194
18261bd1
DG
3195 return NULL;
3196
3197found:
f7079f67
DG
3198 return relayd;
3199}
3200
ca22feea
DG
3201/*
3202 * Check if for a given session id there is still data needed to be extract
3203 * from the buffers.
3204 *
6d805429 3205 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3206 */
6d805429 3207int consumer_data_pending(uint64_t id)
ca22feea
DG
3208{
3209 int ret;
3210 struct lttng_ht_iter iter;
3211 struct lttng_ht *ht;
3212 struct lttng_consumer_stream *stream;
f7079f67 3213 struct consumer_relayd_sock_pair *relayd = NULL;
6d805429 3214 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3215
6d805429 3216 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3217
6f6eda74 3218 rcu_read_lock();
ca22feea
DG
3219 pthread_mutex_lock(&consumer_data.lock);
3220
3221 switch (consumer_data.type) {
3222 case LTTNG_CONSUMER_KERNEL:
6d805429 3223 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3224 break;
3225 case LTTNG_CONSUMER32_UST:
3226 case LTTNG_CONSUMER64_UST:
6d805429 3227 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3228 break;
3229 default:
3230 ERR("Unknown consumer data type");
3231 assert(0);
3232 }
3233
3234 /* Ease our life a bit */
3235 ht = consumer_data.stream_list_ht;
3236
f7079f67
DG
3237 relayd = find_relayd_by_session_id(id);
3238 if (relayd) {
3239 /* Send init command for data pending. */
3240 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3241 ret = relayd_begin_data_pending(&relayd->control_sock,
3242 relayd->relayd_session_id);
3243 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3244 if (ret < 0) {
3245 /* Communication error thus the relayd so no data pending. */
3246 goto data_not_pending;
3247 }
3248 }
3249
c8f59ee5 3250 cds_lfht_for_each_entry_duplicate(ht->ht,
d88aee68
DG
3251 ht->hash_fct(&id, lttng_ht_seed),
3252 ht->match_fct, &id,
ca22feea 3253 &iter.iter, stream, node_session_id.node) {
4e9a4686
DG
3254 /* If this call fails, the stream is being used hence data pending. */
3255 ret = stream_try_lock(stream);
3256 if (!ret) {
f7079f67 3257 goto data_pending;
ca22feea 3258 }
ca22feea 3259
4e9a4686
DG
3260 /*
3261 * A removed node from the hash table indicates that the stream has
3262 * been deleted thus having a guarantee that the buffers are closed
3263 * on the consumer side. However, data can still be transmitted
3264 * over the network so don't skip the relayd check.
3265 */
3266 ret = cds_lfht_is_node_deleted(&stream->node.node);
3267 if (!ret) {
3268 /* Check the stream if there is data in the buffers. */
6d805429
DG
3269 ret = data_pending(stream);
3270 if (ret == 1) {
4e9a4686 3271 pthread_mutex_unlock(&stream->lock);
f7079f67 3272 goto data_pending;
4e9a4686
DG
3273 }
3274 }
3275
3276 /* Relayd check */
f7079f67 3277 if (relayd) {
c8f59ee5
DG
3278 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3279 if (stream->metadata_flag) {
ad7051c0
DG
3280 ret = relayd_quiescent_control(&relayd->control_sock,
3281 stream->relayd_stream_id);
c8f59ee5 3282 } else {
6d805429 3283 ret = relayd_data_pending(&relayd->control_sock,
39df6d9f
DG
3284 stream->relayd_stream_id,
3285 stream->next_net_seq_num - 1);
c8f59ee5
DG
3286 }
3287 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
6d805429 3288 if (ret == 1) {
4e9a4686 3289 pthread_mutex_unlock(&stream->lock);
f7079f67 3290 goto data_pending;
c8f59ee5
DG
3291 }
3292 }
4e9a4686 3293 pthread_mutex_unlock(&stream->lock);
c8f59ee5 3294 }
ca22feea 3295
f7079f67
DG
3296 if (relayd) {
3297 unsigned int is_data_inflight = 0;
3298
3299 /* Send init command for data pending. */
3300 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3301 ret = relayd_end_data_pending(&relayd->control_sock,
3302 relayd->relayd_session_id, &is_data_inflight);
3303 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3304 if (ret < 0) {
f7079f67
DG
3305 goto data_not_pending;
3306 }
bdd88757
DG
3307 if (is_data_inflight) {
3308 goto data_pending;
3309 }
f7079f67
DG
3310 }
3311
ca22feea 3312 /*
f7079f67
DG
3313 * Finding _no_ node in the hash table and no inflight data means that the
3314 * stream(s) have been removed thus data is guaranteed to be available for
3315 * analysis from the trace files.
ca22feea
DG
3316 */
3317
f7079f67 3318data_not_pending:
ca22feea
DG
3319 /* Data is available to be read by a viewer. */
3320 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3321 rcu_read_unlock();
6d805429 3322 return 0;
ca22feea 3323
f7079f67 3324data_pending:
ca22feea
DG
3325 /* Data is still being extracted from buffers. */
3326 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3327 rcu_read_unlock();
6d805429 3328 return 1;
ca22feea 3329}
f50f23d9
DG
3330
3331/*
3332 * Send a ret code status message to the sessiond daemon.
3333 *
3334 * Return the sendmsg() return value.
3335 */
3336int consumer_send_status_msg(int sock, int ret_code)
3337{
3338 struct lttcomm_consumer_status_msg msg;
3339
3340 msg.ret_code = ret_code;
3341
3342 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3343}
ffe60014
DG
3344
3345/*
3346 * Send a channel status message to the sessiond daemon.
3347 *
3348 * Return the sendmsg() return value.
3349 */
3350int consumer_send_status_channel(int sock,
3351 struct lttng_consumer_channel *channel)
3352{
3353 struct lttcomm_consumer_status_channel msg;
3354
3355 assert(sock >= 0);
3356
3357 if (!channel) {
3358 msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
3359 } else {
3360 msg.ret_code = LTTNG_OK;
3361 msg.key = channel->key;
3362 msg.stream_count = channel->streams.count;
3363 }
3364
3365 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3366}
This page took 0.234938 seconds and 5 git commands to generate.