Fix: do not repurpose iterator while it is being used
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <inttypes.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
28
29 #include "consumer.h"
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
33 #include "session.h"
34 #include "lttng-sessiond.h"
35
36 static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
38 {
39 int ret;
40 char tmp_path[PATH_MAX];
41 char *pathname = NULL;
42
43 assert(consumer);
44
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
51 consumer->subdir);
52 if (ret < 0) {
53 PERROR("snprintf kernel channel path");
54 goto error;
55 } else if (ret >= sizeof(tmp_path)) {
56 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
57 sizeof(tmp_path), ret,
58 consumer->dst.session_root_path,
59 consumer->chunk_path,
60 consumer->subdir);
61 goto error;
62 }
63 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
64 if (!pathname) {
65 PERROR("lttng_strndup");
66 goto error;
67 }
68
69 /* Create directory */
70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
71 if (ret < 0) {
72 if (errno != EEXIST) {
73 ERR("Trace directory creation error");
74 goto error;
75 }
76 }
77 DBG3("Kernel local consumer tracefile path: %s", pathname);
78 } else {
79 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
80 consumer->dst.net.base_dir,
81 consumer->subdir);
82 if (ret < 0) {
83 PERROR("snprintf kernel metadata path");
84 goto error;
85 } else if (ret >= sizeof(tmp_path)) {
86 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
87 sizeof(tmp_path), ret,
88 consumer->dst.net.base_dir,
89 consumer->subdir);
90 goto error;
91 }
92 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
93 if (!pathname) {
94 PERROR("lttng_strndup");
95 goto error;
96 }
97 DBG3("Kernel network consumer subdir path: %s", pathname);
98 }
99
100 return pathname;
101
102 error:
103 free(pathname);
104 return NULL;
105 }
106
107 /*
108 * Sending a single channel to the consumer with command ADD_CHANNEL.
109 */
110 int kernel_consumer_add_channel(struct consumer_socket *sock,
111 struct ltt_kernel_channel *channel,
112 struct ltt_kernel_session *ksession,
113 unsigned int monitor)
114 {
115 int ret;
116 char *pathname;
117 struct lttcomm_consumer_msg lkm;
118 struct consumer_output *consumer;
119 enum lttng_error_code status;
120 struct ltt_session *session = NULL;
121 struct lttng_channel_extended *channel_attr_extended;
122
123 /* Safety net */
124 assert(channel);
125 assert(ksession);
126 assert(ksession->consumer);
127
128 consumer = ksession->consumer;
129 channel_attr_extended = (struct lttng_channel_extended *)
130 channel->channel->attr.extended.ptr;
131
132 DBG("Kernel consumer adding channel %s to kernel consumer",
133 channel->channel->name);
134
135 if (monitor) {
136 pathname = create_channel_path(consumer, ksession->uid,
137 ksession->gid);
138 } else {
139 /* Empty path. */
140 pathname = strdup("");
141 }
142 if (!pathname) {
143 ret = -1;
144 goto error;
145 }
146
147 /* Prep channel message structure */
148 consumer_init_add_channel_comm_msg(&lkm,
149 channel->key,
150 ksession->id,
151 pathname,
152 ksession->uid,
153 ksession->gid,
154 consumer->net_seq_index,
155 channel->channel->name,
156 channel->stream_count,
157 channel->channel->attr.output,
158 CONSUMER_CHANNEL_TYPE_DATA,
159 channel->channel->attr.tracefile_size,
160 channel->channel->attr.tracefile_count,
161 monitor,
162 channel->channel->attr.live_timer_interval,
163 channel_attr_extended->monitor_timer_interval);
164
165 health_code_update();
166
167 ret = consumer_send_channel(sock, &lkm);
168 if (ret < 0) {
169 goto error;
170 }
171
172 health_code_update();
173 rcu_read_lock();
174 session = session_find_by_id(ksession->id);
175 assert(session);
176 assert(pthread_mutex_trylock(&session->lock));
177 assert(session_trylock_list());
178
179 status = notification_thread_command_add_channel(
180 notification_thread_handle, session->name,
181 ksession->uid, ksession->gid,
182 channel->channel->name, channel->key,
183 LTTNG_DOMAIN_KERNEL,
184 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
185 rcu_read_unlock();
186 if (status != LTTNG_OK) {
187 ret = -1;
188 goto error;
189 }
190
191 channel->published_to_notification_thread = true;
192
193 error:
194 if (session) {
195 session_put(session);
196 }
197 free(pathname);
198 return ret;
199 }
200
201 /*
202 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
203 *
204 * The consumer socket lock must be held by the caller.
205 */
206 int kernel_consumer_add_metadata(struct consumer_socket *sock,
207 struct ltt_kernel_session *ksession, unsigned int monitor)
208 {
209 int ret;
210 char *pathname;
211 struct lttcomm_consumer_msg lkm;
212 struct consumer_output *consumer;
213 struct ltt_session *session = NULL;
214
215 rcu_read_lock();
216
217 /* Safety net */
218 assert(ksession);
219 assert(ksession->consumer);
220 assert(sock);
221
222 DBG("Sending metadata %d to kernel consumer",
223 ksession->metadata_stream_fd);
224
225 /* Get consumer output pointer */
226 consumer = ksession->consumer;
227
228 if (monitor) {
229 pathname = create_channel_path(consumer,
230 ksession->uid, ksession->gid);
231 } else {
232 /* Empty path. */
233 pathname = strdup("");
234 }
235 if (!pathname) {
236 ret = -1;
237 goto error;
238 }
239
240 session = session_find_by_id(ksession->id);
241 assert(session);
242 assert(pthread_mutex_trylock(&session->lock));
243 assert(session_trylock_list());
244
245 /* Prep channel message structure */
246 consumer_init_add_channel_comm_msg(&lkm,
247 ksession->metadata->key,
248 ksession->id,
249 pathname,
250 ksession->uid,
251 ksession->gid,
252 consumer->net_seq_index,
253 DEFAULT_METADATA_NAME,
254 1,
255 DEFAULT_KERNEL_CHANNEL_OUTPUT,
256 CONSUMER_CHANNEL_TYPE_METADATA,
257 0, 0,
258 monitor, 0, 0);
259
260 health_code_update();
261
262 ret = consumer_send_channel(sock, &lkm);
263 if (ret < 0) {
264 goto error;
265 }
266
267 health_code_update();
268
269 /* Prep stream message structure */
270 consumer_init_add_stream_comm_msg(&lkm,
271 ksession->metadata->key,
272 ksession->metadata_stream_fd,
273 0 /* CPU: 0 for metadata. */,
274 session->current_archive_id);
275
276 health_code_update();
277
278 /* Send stream and file descriptor */
279 ret = consumer_send_stream(sock, consumer, &lkm,
280 &ksession->metadata_stream_fd, 1);
281 if (ret < 0) {
282 goto error;
283 }
284
285 health_code_update();
286
287 error:
288 rcu_read_unlock();
289 free(pathname);
290 if (session) {
291 session_put(session);
292 }
293 return ret;
294 }
295
296 /*
297 * Sending a single stream to the consumer with command ADD_STREAM.
298 */
299 static
300 int kernel_consumer_add_stream(struct consumer_socket *sock,
301 struct ltt_kernel_channel *channel,
302 struct ltt_kernel_stream *stream,
303 struct ltt_kernel_session *session, unsigned int monitor,
304 uint64_t trace_archive_id)
305 {
306 int ret;
307 struct lttcomm_consumer_msg lkm;
308 struct consumer_output *consumer;
309
310 assert(channel);
311 assert(stream);
312 assert(session);
313 assert(session->consumer);
314 assert(sock);
315
316 DBG("Sending stream %d of channel %s to kernel consumer",
317 stream->fd, channel->channel->name);
318
319 /* Get consumer output pointer */
320 consumer = session->consumer;
321
322 /* Prep stream consumer message */
323 consumer_init_add_stream_comm_msg(&lkm,
324 channel->key,
325 stream->fd,
326 stream->cpu,
327 trace_archive_id);
328
329 health_code_update();
330
331 /* Send stream and file descriptor */
332 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
333 if (ret < 0) {
334 goto error;
335 }
336
337 health_code_update();
338
339 error:
340 return ret;
341 }
342
343 /*
344 * Sending the notification that all streams were sent with STREAMS_SENT.
345 */
346 int kernel_consumer_streams_sent(struct consumer_socket *sock,
347 struct ltt_kernel_session *session, uint64_t channel_key)
348 {
349 int ret;
350 struct lttcomm_consumer_msg lkm;
351 struct consumer_output *consumer;
352
353 assert(sock);
354 assert(session);
355
356 DBG("Sending streams_sent");
357 /* Get consumer output pointer */
358 consumer = session->consumer;
359
360 /* Prep stream consumer message */
361 consumer_init_streams_sent_comm_msg(&lkm,
362 LTTNG_CONSUMER_STREAMS_SENT,
363 channel_key, consumer->net_seq_index);
364
365 health_code_update();
366
367 /* Send stream and file descriptor */
368 ret = consumer_send_msg(sock, &lkm);
369 if (ret < 0) {
370 goto error;
371 }
372
373 error:
374 return ret;
375 }
376
377 /*
378 * Send all stream fds of kernel channel to the consumer.
379 *
380 * The consumer socket lock must be held by the caller.
381 */
382 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
383 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
384 unsigned int monitor)
385 {
386 int ret = LTTNG_OK;
387 struct ltt_kernel_stream *stream;
388 struct ltt_session *session = NULL;
389
390 /* Safety net */
391 assert(channel);
392 assert(ksession);
393 assert(ksession->consumer);
394 assert(sock);
395
396 rcu_read_lock();
397
398 session = session_find_by_id(ksession->id);
399 assert(session);
400 assert(pthread_mutex_trylock(&session->lock));
401 assert(session_trylock_list());
402
403 /* Bail out if consumer is disabled */
404 if (!ksession->consumer->enabled) {
405 ret = LTTNG_OK;
406 goto error;
407 }
408
409 DBG("Sending streams of channel %s to kernel consumer",
410 channel->channel->name);
411
412 if (!channel->sent_to_consumer) {
413 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
414 if (ret < 0) {
415 goto error;
416 }
417 channel->sent_to_consumer = true;
418 }
419
420 /* Send streams */
421 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
422 if (!stream->fd || stream->sent_to_consumer) {
423 continue;
424 }
425
426 /* Add stream on the kernel consumer side. */
427 ret = kernel_consumer_add_stream(sock, channel, stream,
428 ksession, monitor, session->current_archive_id);
429 if (ret < 0) {
430 goto error;
431 }
432 stream->sent_to_consumer = true;
433 }
434
435 error:
436 rcu_read_unlock();
437 if (session) {
438 session_put(session);
439 }
440 return ret;
441 }
442
443 /*
444 * Send all stream fds of the kernel session to the consumer.
445 *
446 * The consumer socket lock must be held by the caller.
447 */
448 int kernel_consumer_send_session(struct consumer_socket *sock,
449 struct ltt_kernel_session *session)
450 {
451 int ret, monitor = 0;
452 struct ltt_kernel_channel *chan;
453
454 /* Safety net */
455 assert(session);
456 assert(session->consumer);
457 assert(sock);
458
459 /* Bail out if consumer is disabled */
460 if (!session->consumer->enabled) {
461 ret = LTTNG_OK;
462 goto error;
463 }
464
465 /* Don't monitor the streams on the consumer if in flight recorder. */
466 if (session->output_traces) {
467 monitor = 1;
468 }
469
470 DBG("Sending session stream to kernel consumer");
471
472 if (session->metadata_stream_fd >= 0 && session->metadata) {
473 ret = kernel_consumer_add_metadata(sock, session, monitor);
474 if (ret < 0) {
475 goto error;
476 }
477 }
478
479 /* Send channel and streams of it */
480 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
481 ret = kernel_consumer_send_channel_streams(sock, chan, session,
482 monitor);
483 if (ret < 0) {
484 goto error;
485 }
486 if (monitor) {
487 /*
488 * Inform the relay that all the streams for the
489 * channel were sent.
490 */
491 ret = kernel_consumer_streams_sent(sock, session, chan->key);
492 if (ret < 0) {
493 goto error;
494 }
495 }
496 }
497
498 DBG("Kernel consumer FDs of metadata and channel streams sent");
499
500 session->consumer_fds_sent = 1;
501 return 0;
502
503 error:
504 return ret;
505 }
506
507 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
508 struct ltt_kernel_channel *channel)
509 {
510 int ret;
511 struct lttcomm_consumer_msg msg;
512
513 assert(channel);
514 assert(socket);
515
516 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
517
518 memset(&msg, 0, sizeof(msg));
519 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
520 msg.u.destroy_channel.key = channel->key;
521
522 pthread_mutex_lock(socket->lock);
523 health_code_update();
524
525 ret = consumer_send_msg(socket, &msg);
526 if (ret < 0) {
527 goto error;
528 }
529
530 error:
531 health_code_update();
532 pthread_mutex_unlock(socket->lock);
533 return ret;
534 }
535
536 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
537 struct ltt_kernel_metadata *metadata)
538 {
539 int ret;
540 struct lttcomm_consumer_msg msg;
541
542 assert(metadata);
543 assert(socket);
544
545 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
546
547 memset(&msg, 0, sizeof(msg));
548 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
549 msg.u.destroy_channel.key = metadata->key;
550
551 pthread_mutex_lock(socket->lock);
552 health_code_update();
553
554 ret = consumer_send_msg(socket, &msg);
555 if (ret < 0) {
556 goto error;
557 }
558
559 error:
560 health_code_update();
561 pthread_mutex_unlock(socket->lock);
562 return ret;
563 }
This page took 0.048784 seconds and 5 git commands to generate.