d21b1868aaf924e9199b8ad28bff846061d4dff2
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
27
28 #include "consumer.h"
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31
32 static char *create_channel_path(struct consumer_output *consumer,
33 uid_t uid, gid_t gid)
34 {
35 int ret;
36 char tmp_path[PATH_MAX];
37 char *pathname = NULL;
38
39 assert(consumer);
40
41 /* Get the right path name destination */
42 if (consumer->type == CONSUMER_DST_LOCAL) {
43 /* Set application path to the destination path */
44 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
45 consumer->dst.trace_path, consumer->subdir);
46 if (ret < 0) {
47 PERROR("snprintf kernel channel path");
48 goto error;
49 }
50 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
51 if (!pathname) {
52 PERROR("lttng_strndup");
53 goto error;
54 }
55
56 /* Create directory */
57 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
58 if (ret < 0) {
59 if (errno != EEXIST) {
60 ERR("Trace directory creation error");
61 goto error;
62 }
63 }
64 DBG3("Kernel local consumer tracefile path: %s", pathname);
65 } else {
66 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
67 if (ret < 0) {
68 PERROR("snprintf kernel metadata path");
69 goto error;
70 }
71 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
72 if (!pathname) {
73 PERROR("lttng_strndup");
74 goto error;
75 }
76 DBG3("Kernel network consumer subdir path: %s", pathname);
77 }
78
79 return pathname;
80
81 error:
82 free(pathname);
83 return NULL;
84 }
85
86 /*
87 * Sending a single channel to the consumer with command ADD_CHANNEL.
88 */
89 int kernel_consumer_add_channel(struct consumer_socket *sock,
90 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
91 unsigned int monitor)
92 {
93 int ret;
94 char *pathname;
95 struct lttcomm_consumer_msg lkm;
96 struct consumer_output *consumer;
97
98 /* Safety net */
99 assert(channel);
100 assert(session);
101 assert(session->consumer);
102
103 consumer = session->consumer;
104
105 DBG("Kernel consumer adding channel %s to kernel consumer",
106 channel->channel->name);
107
108 if (monitor) {
109 pathname = create_channel_path(consumer, session->uid, session->gid);
110 } else {
111 /* Empty path. */
112 pathname = strdup("");
113 }
114 if (!pathname) {
115 ret = -1;
116 goto error;
117 }
118
119 /* Prep channel message structure */
120 consumer_init_channel_comm_msg(&lkm,
121 LTTNG_CONSUMER_ADD_CHANNEL,
122 channel->fd,
123 session->id,
124 pathname,
125 session->uid,
126 session->gid,
127 consumer->net_seq_index,
128 channel->channel->name,
129 channel->stream_count,
130 channel->channel->attr.output,
131 CONSUMER_CHANNEL_TYPE_DATA,
132 channel->channel->attr.tracefile_size,
133 channel->channel->attr.tracefile_count,
134 monitor,
135 channel->channel->attr.live_timer_interval);
136
137 health_code_update();
138
139 ret = consumer_send_channel(sock, &lkm);
140 if (ret < 0) {
141 goto error;
142 }
143
144 health_code_update();
145
146 error:
147 free(pathname);
148 return ret;
149 }
150
151 /*
152 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
153 *
154 * The consumer socket lock must be held by the caller.
155 */
156 int kernel_consumer_add_metadata(struct consumer_socket *sock,
157 struct ltt_kernel_session *session, unsigned int monitor)
158 {
159 int ret;
160 char *pathname;
161 struct lttcomm_consumer_msg lkm;
162 struct consumer_output *consumer;
163
164 /* Safety net */
165 assert(session);
166 assert(session->consumer);
167 assert(sock);
168
169 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
170
171 /* Get consumer output pointer */
172 consumer = session->consumer;
173
174 if (monitor) {
175 pathname = create_channel_path(consumer, session->uid, session->gid);
176 } else {
177 /* Empty path. */
178 pathname = strdup("");
179 }
180 if (!pathname) {
181 ret = -1;
182 goto error;
183 }
184
185 /* Prep channel message structure */
186 consumer_init_channel_comm_msg(&lkm,
187 LTTNG_CONSUMER_ADD_CHANNEL,
188 session->metadata->fd,
189 session->id,
190 pathname,
191 session->uid,
192 session->gid,
193 consumer->net_seq_index,
194 DEFAULT_METADATA_NAME,
195 1,
196 DEFAULT_KERNEL_CHANNEL_OUTPUT,
197 CONSUMER_CHANNEL_TYPE_METADATA,
198 0, 0,
199 monitor, 0);
200
201 health_code_update();
202
203 ret = consumer_send_channel(sock, &lkm);
204 if (ret < 0) {
205 goto error;
206 }
207
208 health_code_update();
209
210 /* Prep stream message structure */
211 consumer_init_stream_comm_msg(&lkm,
212 LTTNG_CONSUMER_ADD_STREAM,
213 session->metadata->fd,
214 session->metadata_stream_fd,
215 0); /* CPU: 0 for metadata. */
216
217 health_code_update();
218
219 /* Send stream and file descriptor */
220 ret = consumer_send_stream(sock, consumer, &lkm,
221 &session->metadata_stream_fd, 1);
222 if (ret < 0) {
223 goto error;
224 }
225
226 health_code_update();
227
228 error:
229 free(pathname);
230 return ret;
231 }
232
233 /*
234 * Sending a single stream to the consumer with command ADD_STREAM.
235 */
236 int kernel_consumer_add_stream(struct consumer_socket *sock,
237 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
238 struct ltt_kernel_session *session, unsigned int monitor)
239 {
240 int ret;
241 struct lttcomm_consumer_msg lkm;
242 struct consumer_output *consumer;
243
244 assert(channel);
245 assert(stream);
246 assert(session);
247 assert(session->consumer);
248 assert(sock);
249
250 DBG("Sending stream %d of channel %s to kernel consumer",
251 stream->fd, channel->channel->name);
252
253 /* Get consumer output pointer */
254 consumer = session->consumer;
255
256 /* Prep stream consumer message */
257 consumer_init_stream_comm_msg(&lkm,
258 LTTNG_CONSUMER_ADD_STREAM,
259 channel->fd,
260 stream->fd,
261 stream->cpu);
262
263 health_code_update();
264
265 /* Send stream and file descriptor */
266 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
267 if (ret < 0) {
268 goto error;
269 }
270
271 health_code_update();
272
273 error:
274 return ret;
275 }
276
277 /*
278 * Sending the notification that all streams were sent with STREAMS_SENT.
279 */
280 int kernel_consumer_streams_sent(struct consumer_socket *sock,
281 struct ltt_kernel_session *session, uint64_t channel_key)
282 {
283 int ret;
284 struct lttcomm_consumer_msg lkm;
285 struct consumer_output *consumer;
286
287 assert(sock);
288 assert(session);
289
290 DBG("Sending streams_sent");
291 /* Get consumer output pointer */
292 consumer = session->consumer;
293
294 /* Prep stream consumer message */
295 consumer_init_streams_sent_comm_msg(&lkm,
296 LTTNG_CONSUMER_STREAMS_SENT,
297 channel_key, consumer->net_seq_index);
298
299 health_code_update();
300
301 /* Send stream and file descriptor */
302 ret = consumer_send_msg(sock, &lkm);
303 if (ret < 0) {
304 goto error;
305 }
306
307 error:
308 return ret;
309 }
310
311 /*
312 * Send all stream fds of kernel channel to the consumer.
313 *
314 * The consumer socket lock must be held by the caller.
315 */
316 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
317 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
318 unsigned int monitor)
319 {
320 int ret = LTTNG_OK;
321 struct ltt_kernel_stream *stream;
322
323 /* Safety net */
324 assert(channel);
325 assert(session);
326 assert(session->consumer);
327 assert(sock);
328
329 /* Bail out if consumer is disabled */
330 if (!session->consumer->enabled) {
331 ret = LTTNG_OK;
332 goto error;
333 }
334
335 DBG("Sending streams of channel %s to kernel consumer",
336 channel->channel->name);
337
338 if (!channel->sent_to_consumer) {
339 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
340 if (ret < 0) {
341 goto error;
342 }
343 channel->sent_to_consumer = true;
344 }
345
346 /* Send streams */
347 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
348 if (!stream->fd || stream->sent_to_consumer) {
349 continue;
350 }
351
352 /* Add stream on the kernel consumer side. */
353 ret = kernel_consumer_add_stream(sock, channel, stream, session,
354 monitor);
355 if (ret < 0) {
356 goto error;
357 }
358 stream->sent_to_consumer = true;
359 }
360
361 error:
362 return ret;
363 }
364
365 /*
366 * Send all stream fds of the kernel session to the consumer.
367 *
368 * The consumer socket lock must be held by the caller.
369 */
370 int kernel_consumer_send_session(struct consumer_socket *sock,
371 struct ltt_kernel_session *session)
372 {
373 int ret, monitor = 0;
374 struct ltt_kernel_channel *chan;
375
376 /* Safety net */
377 assert(session);
378 assert(session->consumer);
379 assert(sock);
380
381 /* Bail out if consumer is disabled */
382 if (!session->consumer->enabled) {
383 ret = LTTNG_OK;
384 goto error;
385 }
386
387 /* Don't monitor the streams on the consumer if in flight recorder. */
388 if (session->output_traces) {
389 monitor = 1;
390 }
391
392 DBG("Sending session stream to kernel consumer");
393
394 if (session->metadata_stream_fd >= 0 && session->metadata) {
395 ret = kernel_consumer_add_metadata(sock, session, monitor);
396 if (ret < 0) {
397 goto error;
398 }
399 }
400
401 /* Send channel and streams of it */
402 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
403 ret = kernel_consumer_send_channel_stream(sock, chan, session,
404 monitor);
405 if (ret < 0) {
406 goto error;
407 }
408 if (monitor) {
409 /*
410 * Inform the relay that all the streams for the
411 * channel were sent.
412 */
413 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
414 if (ret < 0) {
415 goto error;
416 }
417 }
418 }
419
420 DBG("Kernel consumer FDs of metadata and channel streams sent");
421
422 session->consumer_fds_sent = 1;
423 return 0;
424
425 error:
426 return ret;
427 }
428
429 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
430 struct ltt_kernel_channel *channel)
431 {
432 int ret;
433 struct lttcomm_consumer_msg msg;
434
435 assert(channel);
436 assert(socket);
437
438 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
439
440 memset(&msg, 0, sizeof(msg));
441 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
442 msg.u.destroy_channel.key = channel->fd;
443
444 pthread_mutex_lock(socket->lock);
445 health_code_update();
446
447 ret = consumer_send_msg(socket, &msg);
448 if (ret < 0) {
449 goto error;
450 }
451
452 error:
453 health_code_update();
454 pthread_mutex_unlock(socket->lock);
455 return ret;
456 }
457
458 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
459 struct ltt_kernel_metadata *metadata)
460 {
461 int ret;
462 struct lttcomm_consumer_msg msg;
463
464 assert(metadata);
465 assert(socket);
466
467 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
468
469 memset(&msg, 0, sizeof(msg));
470 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
471 msg.u.destroy_channel.key = metadata->fd;
472
473 pthread_mutex_lock(socket->lock);
474 health_code_update();
475
476 ret = consumer_send_msg(socket, &msg);
477 if (ret < 0) {
478 goto error;
479 }
480
481 error:
482 health_code_update();
483 pthread_mutex_unlock(socket->lock);
484 return ret;
485 }
This page took 0.041712 seconds and 4 git commands to generate.