Change trace_path to session_root_path and chunk_path
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
27
28 #include "consumer.h"
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31 #include "notification-thread-commands.h"
32 #include "session.h"
33 #include "lttng-sessiond.h"
34
35 static char *create_channel_path(struct consumer_output *consumer,
36 uid_t uid, gid_t gid)
37 {
38 int ret;
39 char tmp_path[PATH_MAX];
40 char *pathname = NULL;
41
42 assert(consumer);
43
44 /* Get the right path name destination */
45 if (consumer->type == CONSUMER_DST_LOCAL) {
46 /* Set application path to the destination path */
47 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
48 consumer->dst.session_root_path,
49 consumer->chunk_path,
50 consumer->subdir);
51 if (ret < 0) {
52 PERROR("snprintf kernel channel path");
53 goto error;
54 }
55 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
56 if (!pathname) {
57 PERROR("lttng_strndup");
58 goto error;
59 }
60
61 /* Create directory */
62 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
63 if (ret < 0) {
64 if (errno != EEXIST) {
65 ERR("Trace directory creation error");
66 goto error;
67 }
68 }
69 DBG3("Kernel local consumer tracefile path: %s", pathname);
70 } else {
71 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
72 if (ret < 0) {
73 PERROR("snprintf kernel metadata path");
74 goto error;
75 }
76 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
77 if (!pathname) {
78 PERROR("lttng_strndup");
79 goto error;
80 }
81 DBG3("Kernel network consumer subdir path: %s", pathname);
82 }
83
84 return pathname;
85
86 error:
87 free(pathname);
88 return NULL;
89 }
90
91 /*
92 * Sending a single channel to the consumer with command ADD_CHANNEL.
93 */
94 int kernel_consumer_add_channel(struct consumer_socket *sock,
95 struct ltt_kernel_channel *channel,
96 struct ltt_kernel_session *ksession,
97 unsigned int monitor)
98 {
99 int ret;
100 char *pathname;
101 struct lttcomm_consumer_msg lkm;
102 struct consumer_output *consumer;
103 enum lttng_error_code status;
104 struct ltt_session *session;
105 struct lttng_channel_extended *channel_attr_extended;
106
107 /* Safety net */
108 assert(channel);
109 assert(ksession);
110 assert(ksession->consumer);
111
112 consumer = ksession->consumer;
113 channel_attr_extended = (struct lttng_channel_extended *)
114 channel->channel->attr.extended.ptr;
115
116 DBG("Kernel consumer adding channel %s to kernel consumer",
117 channel->channel->name);
118
119 if (monitor) {
120 pathname = create_channel_path(consumer, ksession->uid,
121 ksession->gid);
122 } else {
123 /* Empty path. */
124 pathname = strdup("");
125 }
126 if (!pathname) {
127 ret = -1;
128 goto error;
129 }
130
131 /* Prep channel message structure */
132 consumer_init_channel_comm_msg(&lkm,
133 LTTNG_CONSUMER_ADD_CHANNEL,
134 channel->fd,
135 ksession->id,
136 pathname,
137 ksession->uid,
138 ksession->gid,
139 consumer->net_seq_index,
140 channel->channel->name,
141 channel->stream_count,
142 channel->channel->attr.output,
143 CONSUMER_CHANNEL_TYPE_DATA,
144 channel->channel->attr.tracefile_size,
145 channel->channel->attr.tracefile_count,
146 monitor,
147 channel->channel->attr.live_timer_interval,
148 channel_attr_extended->monitor_timer_interval);
149
150 health_code_update();
151
152 ret = consumer_send_channel(sock, &lkm);
153 if (ret < 0) {
154 goto error;
155 }
156
157 health_code_update();
158 rcu_read_lock();
159 session = session_find_by_id(ksession->id);
160 assert(session);
161
162 status = notification_thread_command_add_channel(
163 notification_thread_handle, session->name,
164 ksession->uid, ksession->gid,
165 channel->channel->name, channel->fd,
166 LTTNG_DOMAIN_KERNEL,
167 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
168 rcu_read_unlock();
169 if (status != LTTNG_OK) {
170 ret = -1;
171 goto error;
172 }
173
174 channel->published_to_notification_thread = true;
175
176 error:
177 free(pathname);
178 return ret;
179 }
180
181 /*
182 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
183 */
184 int kernel_consumer_add_metadata(struct consumer_socket *sock,
185 struct ltt_kernel_session *session, unsigned int monitor)
186 {
187 int ret;
188 char *pathname;
189 struct lttcomm_consumer_msg lkm;
190 struct consumer_output *consumer;
191
192 /* Safety net */
193 assert(session);
194 assert(session->consumer);
195 assert(sock);
196
197 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
198
199 /* Get consumer output pointer */
200 consumer = session->consumer;
201
202 if (monitor) {
203 pathname = create_channel_path(consumer, session->uid, session->gid);
204 } else {
205 /* Empty path. */
206 pathname = strdup("");
207 }
208 if (!pathname) {
209 ret = -1;
210 goto error;
211 }
212
213 /* Prep channel message structure */
214 consumer_init_channel_comm_msg(&lkm,
215 LTTNG_CONSUMER_ADD_CHANNEL,
216 session->metadata->fd,
217 session->id,
218 pathname,
219 session->uid,
220 session->gid,
221 consumer->net_seq_index,
222 DEFAULT_METADATA_NAME,
223 1,
224 DEFAULT_KERNEL_CHANNEL_OUTPUT,
225 CONSUMER_CHANNEL_TYPE_METADATA,
226 0, 0,
227 monitor, 0, 0);
228
229 health_code_update();
230
231 ret = consumer_send_channel(sock, &lkm);
232 if (ret < 0) {
233 goto error;
234 }
235
236 health_code_update();
237
238 /* Prep stream message structure */
239 consumer_init_stream_comm_msg(&lkm,
240 LTTNG_CONSUMER_ADD_STREAM,
241 session->metadata->fd,
242 session->metadata_stream_fd,
243 0); /* CPU: 0 for metadata. */
244
245 health_code_update();
246
247 /* Send stream and file descriptor */
248 ret = consumer_send_stream(sock, consumer, &lkm,
249 &session->metadata_stream_fd, 1);
250 if (ret < 0) {
251 goto error;
252 }
253
254 health_code_update();
255
256 error:
257 free(pathname);
258 return ret;
259 }
260
261 /*
262 * Sending a single stream to the consumer with command ADD_STREAM.
263 */
264 int kernel_consumer_add_stream(struct consumer_socket *sock,
265 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
266 struct ltt_kernel_session *session, unsigned int monitor)
267 {
268 int ret;
269 struct lttcomm_consumer_msg lkm;
270 struct consumer_output *consumer;
271
272 assert(channel);
273 assert(stream);
274 assert(session);
275 assert(session->consumer);
276 assert(sock);
277
278 DBG("Sending stream %d of channel %s to kernel consumer",
279 stream->fd, channel->channel->name);
280
281 /* Get consumer output pointer */
282 consumer = session->consumer;
283
284 /* Prep stream consumer message */
285 consumer_init_stream_comm_msg(&lkm,
286 LTTNG_CONSUMER_ADD_STREAM,
287 channel->fd,
288 stream->fd,
289 stream->cpu);
290
291 health_code_update();
292
293 /* Send stream and file descriptor */
294 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
295 if (ret < 0) {
296 goto error;
297 }
298
299 health_code_update();
300
301 error:
302 return ret;
303 }
304
305 /*
306 * Sending the notification that all streams were sent with STREAMS_SENT.
307 */
308 int kernel_consumer_streams_sent(struct consumer_socket *sock,
309 struct ltt_kernel_session *session, uint64_t channel_key)
310 {
311 int ret;
312 struct lttcomm_consumer_msg lkm;
313 struct consumer_output *consumer;
314
315 assert(sock);
316 assert(session);
317
318 DBG("Sending streams_sent");
319 /* Get consumer output pointer */
320 consumer = session->consumer;
321
322 /* Prep stream consumer message */
323 consumer_init_streams_sent_comm_msg(&lkm,
324 LTTNG_CONSUMER_STREAMS_SENT,
325 channel_key, consumer->net_seq_index);
326
327 health_code_update();
328
329 /* Send stream and file descriptor */
330 ret = consumer_send_msg(sock, &lkm);
331 if (ret < 0) {
332 goto error;
333 }
334
335 error:
336 return ret;
337 }
338
339 /*
340 * Send all stream fds of kernel channel to the consumer.
341 */
342 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
343 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
344 unsigned int monitor)
345 {
346 int ret = LTTNG_OK;
347 struct ltt_kernel_stream *stream;
348
349 /* Safety net */
350 assert(channel);
351 assert(session);
352 assert(session->consumer);
353 assert(sock);
354
355 /* Bail out if consumer is disabled */
356 if (!session->consumer->enabled) {
357 ret = LTTNG_OK;
358 goto error;
359 }
360
361 DBG("Sending streams of channel %s to kernel consumer",
362 channel->channel->name);
363
364 if (!channel->sent_to_consumer) {
365 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
366 if (ret < 0) {
367 goto error;
368 }
369 channel->sent_to_consumer = true;
370 }
371
372 /* Send streams */
373 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
374 if (!stream->fd || stream->sent_to_consumer) {
375 continue;
376 }
377
378 /* Add stream on the kernel consumer side. */
379 ret = kernel_consumer_add_stream(sock, channel, stream, session,
380 monitor);
381 if (ret < 0) {
382 goto error;
383 }
384 stream->sent_to_consumer = true;
385 }
386
387 error:
388 return ret;
389 }
390
391 /*
392 * Send all stream fds of the kernel session to the consumer.
393 */
394 int kernel_consumer_send_session(struct consumer_socket *sock,
395 struct ltt_kernel_session *session)
396 {
397 int ret, monitor = 0;
398 struct ltt_kernel_channel *chan;
399
400 /* Safety net */
401 assert(session);
402 assert(session->consumer);
403 assert(sock);
404
405 /* Bail out if consumer is disabled */
406 if (!session->consumer->enabled) {
407 ret = LTTNG_OK;
408 goto error;
409 }
410
411 /* Don't monitor the streams on the consumer if in flight recorder. */
412 if (session->output_traces) {
413 monitor = 1;
414 }
415
416 DBG("Sending session stream to kernel consumer");
417
418 if (session->metadata_stream_fd >= 0 && session->metadata) {
419 ret = kernel_consumer_add_metadata(sock, session, monitor);
420 if (ret < 0) {
421 goto error;
422 }
423 }
424
425 /* Send channel and streams of it */
426 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
427 ret = kernel_consumer_send_channel_stream(sock, chan, session,
428 monitor);
429 if (ret < 0) {
430 goto error;
431 }
432 if (monitor) {
433 /*
434 * Inform the relay that all the streams for the
435 * channel were sent.
436 */
437 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
438 if (ret < 0) {
439 goto error;
440 }
441 }
442 }
443
444 DBG("Kernel consumer FDs of metadata and channel streams sent");
445
446 session->consumer_fds_sent = 1;
447 return 0;
448
449 error:
450 return ret;
451 }
452
453 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
454 struct ltt_kernel_channel *channel)
455 {
456 int ret;
457 struct lttcomm_consumer_msg msg;
458
459 assert(channel);
460 assert(socket);
461
462 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
463
464 memset(&msg, 0, sizeof(msg));
465 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
466 msg.u.destroy_channel.key = channel->fd;
467
468 pthread_mutex_lock(socket->lock);
469 health_code_update();
470
471 ret = consumer_send_msg(socket, &msg);
472 if (ret < 0) {
473 goto error;
474 }
475
476 error:
477 health_code_update();
478 pthread_mutex_unlock(socket->lock);
479 return ret;
480 }
481
482 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
483 struct ltt_kernel_metadata *metadata)
484 {
485 int ret;
486 struct lttcomm_consumer_msg msg;
487
488 assert(metadata);
489 assert(socket);
490
491 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
492
493 memset(&msg, 0, sizeof(msg));
494 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
495 msg.u.destroy_channel.key = metadata->fd;
496
497 pthread_mutex_lock(socket->lock);
498 health_code_update();
499
500 ret = consumer_send_msg(socket, &msg);
501 if (ret < 0) {
502 goto error;
503 }
504
505 error:
506 health_code_update();
507 pthread_mutex_unlock(socket->lock);
508 return ret;
509 }
This page took 0.063478 seconds and 5 git commands to generate.