consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
27
28 #include "consumer.h"
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31
32 static char *create_channel_path(struct consumer_output *consumer,
33 uid_t uid, gid_t gid)
34 {
35 int ret;
36 char tmp_path[PATH_MAX];
37 char *pathname = NULL;
38
39 assert(consumer);
40
41 /* Get the right path name destination */
42 if (consumer->type == CONSUMER_DST_LOCAL) {
43 /* Set application path to the destination path */
44 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
45 consumer->dst.trace_path, consumer->subdir);
46 if (ret < 0) {
47 PERROR("snprintf kernel channel path");
48 goto error;
49 }
50 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
51 if (!pathname) {
52 PERROR("lttng_strndup");
53 goto error;
54 }
55
56 /* Create directory */
57 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
58 if (ret < 0) {
59 if (errno != EEXIST) {
60 ERR("Trace directory creation error");
61 goto error;
62 }
63 }
64 DBG3("Kernel local consumer tracefile path: %s", pathname);
65 } else {
66 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
67 if (ret < 0) {
68 PERROR("snprintf kernel metadata path");
69 goto error;
70 }
71 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
72 if (!pathname) {
73 PERROR("lttng_strndup");
74 goto error;
75 }
76 DBG3("Kernel network consumer subdir path: %s", pathname);
77 }
78
79 return pathname;
80
81 error:
82 free(pathname);
83 return NULL;
84 }
85
86 /*
87 * Sending a single channel to the consumer with command ADD_CHANNEL.
88 */
89 int kernel_consumer_add_channel(struct consumer_socket *sock,
90 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
91 unsigned int monitor)
92 {
93 int ret;
94 char *pathname;
95 struct lttcomm_consumer_msg lkm;
96 struct consumer_output *consumer;
97
98 /* Safety net */
99 assert(channel);
100 assert(session);
101 assert(session->consumer);
102
103 consumer = session->consumer;
104
105 DBG("Kernel consumer adding channel %s to kernel consumer",
106 channel->channel->name);
107
108 if (monitor) {
109 pathname = create_channel_path(consumer, session->uid, session->gid);
110 } else {
111 /* Empty path. */
112 pathname = strdup("");
113 }
114 if (!pathname) {
115 ret = -1;
116 goto error;
117 }
118
119 /* Prep channel message structure */
120 consumer_init_channel_comm_msg(&lkm,
121 LTTNG_CONSUMER_ADD_CHANNEL,
122 channel->fd,
123 session->id,
124 pathname,
125 session->uid,
126 session->gid,
127 consumer->net_seq_index,
128 channel->channel->name,
129 channel->stream_count,
130 channel->channel->attr.output,
131 CONSUMER_CHANNEL_TYPE_DATA,
132 channel->channel->attr.tracefile_size,
133 channel->channel->attr.tracefile_count,
134 monitor,
135 channel->channel->attr.live_timer_interval,
136 session->is_live_session);
137
138 health_code_update();
139
140 ret = consumer_send_channel(sock, &lkm);
141 if (ret < 0) {
142 goto error;
143 }
144
145 health_code_update();
146
147 error:
148 free(pathname);
149 return ret;
150 }
151
152 /*
153 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
154 *
155 * The consumer socket lock must be held by the caller.
156 */
157 int kernel_consumer_add_metadata(struct consumer_socket *sock,
158 struct ltt_kernel_session *session, unsigned int monitor)
159 {
160 int ret;
161 char *pathname;
162 struct lttcomm_consumer_msg lkm;
163 struct consumer_output *consumer;
164
165 /* Safety net */
166 assert(session);
167 assert(session->consumer);
168 assert(sock);
169
170 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
171
172 /* Get consumer output pointer */
173 consumer = session->consumer;
174
175 if (monitor) {
176 pathname = create_channel_path(consumer, session->uid, session->gid);
177 } else {
178 /* Empty path. */
179 pathname = strdup("");
180 }
181 if (!pathname) {
182 ret = -1;
183 goto error;
184 }
185
186 /* Prep channel message structure */
187 consumer_init_channel_comm_msg(&lkm,
188 LTTNG_CONSUMER_ADD_CHANNEL,
189 session->metadata->fd,
190 session->id,
191 pathname,
192 session->uid,
193 session->gid,
194 consumer->net_seq_index,
195 DEFAULT_METADATA_NAME,
196 1,
197 DEFAULT_KERNEL_CHANNEL_OUTPUT,
198 CONSUMER_CHANNEL_TYPE_METADATA,
199 0, 0,
200 monitor, 0, session->is_live_session);
201
202 health_code_update();
203
204 ret = consumer_send_channel(sock, &lkm);
205 if (ret < 0) {
206 goto error;
207 }
208
209 health_code_update();
210
211 /* Prep stream message structure */
212 consumer_init_stream_comm_msg(&lkm,
213 LTTNG_CONSUMER_ADD_STREAM,
214 session->metadata->fd,
215 session->metadata_stream_fd,
216 0); /* CPU: 0 for metadata. */
217
218 health_code_update();
219
220 /* Send stream and file descriptor */
221 ret = consumer_send_stream(sock, consumer, &lkm,
222 &session->metadata_stream_fd, 1);
223 if (ret < 0) {
224 goto error;
225 }
226
227 health_code_update();
228
229 error:
230 free(pathname);
231 return ret;
232 }
233
234 /*
235 * Sending a single stream to the consumer with command ADD_STREAM.
236 */
237 int kernel_consumer_add_stream(struct consumer_socket *sock,
238 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
239 struct ltt_kernel_session *session, unsigned int monitor)
240 {
241 int ret;
242 struct lttcomm_consumer_msg lkm;
243 struct consumer_output *consumer;
244
245 assert(channel);
246 assert(stream);
247 assert(session);
248 assert(session->consumer);
249 assert(sock);
250
251 DBG("Sending stream %d of channel %s to kernel consumer",
252 stream->fd, channel->channel->name);
253
254 /* Get consumer output pointer */
255 consumer = session->consumer;
256
257 /* Prep stream consumer message */
258 consumer_init_stream_comm_msg(&lkm,
259 LTTNG_CONSUMER_ADD_STREAM,
260 channel->fd,
261 stream->fd,
262 stream->cpu);
263
264 health_code_update();
265
266 /* Send stream and file descriptor */
267 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
268 if (ret < 0) {
269 goto error;
270 }
271
272 health_code_update();
273
274 error:
275 return ret;
276 }
277
278 /*
279 * Sending the notification that all streams were sent with STREAMS_SENT.
280 */
281 int kernel_consumer_streams_sent(struct consumer_socket *sock,
282 struct ltt_kernel_session *session, uint64_t channel_key)
283 {
284 int ret;
285 struct lttcomm_consumer_msg lkm;
286 struct consumer_output *consumer;
287
288 assert(sock);
289 assert(session);
290
291 DBG("Sending streams_sent");
292 /* Get consumer output pointer */
293 consumer = session->consumer;
294
295 /* Prep stream consumer message */
296 consumer_init_streams_sent_comm_msg(&lkm,
297 LTTNG_CONSUMER_STREAMS_SENT,
298 channel_key, consumer->net_seq_index);
299
300 health_code_update();
301
302 /* Send stream and file descriptor */
303 ret = consumer_send_msg(sock, &lkm);
304 if (ret < 0) {
305 goto error;
306 }
307
308 error:
309 return ret;
310 }
311
312 /*
313 * Send all stream fds of kernel channel to the consumer.
314 *
315 * The consumer socket lock must be held by the caller.
316 */
317 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
318 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
319 unsigned int monitor)
320 {
321 int ret = LTTNG_OK;
322 struct ltt_kernel_stream *stream;
323
324 /* Safety net */
325 assert(channel);
326 assert(session);
327 assert(session->consumer);
328 assert(sock);
329
330 /* Bail out if consumer is disabled */
331 if (!session->consumer->enabled) {
332 ret = LTTNG_OK;
333 goto error;
334 }
335
336 DBG("Sending streams of channel %s to kernel consumer",
337 channel->channel->name);
338
339 if (!channel->sent_to_consumer) {
340 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
341 if (ret < 0) {
342 goto error;
343 }
344 channel->sent_to_consumer = true;
345 }
346
347 /* Send streams */
348 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
349 if (!stream->fd || stream->sent_to_consumer) {
350 continue;
351 }
352
353 /* Add stream on the kernel consumer side. */
354 ret = kernel_consumer_add_stream(sock, channel, stream, session,
355 monitor);
356 if (ret < 0) {
357 goto error;
358 }
359 stream->sent_to_consumer = true;
360 }
361
362 error:
363 return ret;
364 }
365
366 /*
367 * Send all stream fds of the kernel session to the consumer.
368 *
369 * The consumer socket lock must be held by the caller.
370 */
371 int kernel_consumer_send_session(struct consumer_socket *sock,
372 struct ltt_kernel_session *session)
373 {
374 int ret, monitor = 0;
375 struct ltt_kernel_channel *chan;
376
377 /* Safety net */
378 assert(session);
379 assert(session->consumer);
380 assert(sock);
381
382 /* Bail out if consumer is disabled */
383 if (!session->consumer->enabled) {
384 ret = LTTNG_OK;
385 goto error;
386 }
387
388 /* Don't monitor the streams on the consumer if in flight recorder. */
389 if (session->output_traces) {
390 monitor = 1;
391 }
392
393 DBG("Sending session stream to kernel consumer");
394
395 if (session->metadata_stream_fd >= 0 && session->metadata) {
396 ret = kernel_consumer_add_metadata(sock, session, monitor);
397 if (ret < 0) {
398 goto error;
399 }
400 }
401
402 /* Send channel and streams of it */
403 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
404 ret = kernel_consumer_send_channel_stream(sock, chan, session,
405 monitor);
406 if (ret < 0) {
407 goto error;
408 }
409 if (monitor) {
410 /*
411 * Inform the relay that all the streams for the
412 * channel were sent.
413 */
414 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
415 if (ret < 0) {
416 goto error;
417 }
418 }
419 }
420
421 DBG("Kernel consumer FDs of metadata and channel streams sent");
422
423 session->consumer_fds_sent = 1;
424 return 0;
425
426 error:
427 return ret;
428 }
429
430 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
431 struct ltt_kernel_channel *channel)
432 {
433 int ret;
434 struct lttcomm_consumer_msg msg;
435
436 assert(channel);
437 assert(socket);
438
439 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
440
441 memset(&msg, 0, sizeof(msg));
442 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
443 msg.u.destroy_channel.key = channel->fd;
444
445 pthread_mutex_lock(socket->lock);
446 health_code_update();
447
448 ret = consumer_send_msg(socket, &msg);
449 if (ret < 0) {
450 goto error;
451 }
452
453 error:
454 health_code_update();
455 pthread_mutex_unlock(socket->lock);
456 return ret;
457 }
458
459 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
460 struct ltt_kernel_metadata *metadata)
461 {
462 int ret;
463 struct lttcomm_consumer_msg msg;
464
465 assert(metadata);
466 assert(socket);
467
468 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
469
470 memset(&msg, 0, sizeof(msg));
471 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
472 msg.u.destroy_channel.key = metadata->fd;
473
474 pthread_mutex_lock(socket->lock);
475 health_code_update();
476
477 ret = consumer_send_msg(socket, &msg);
478 if (ret < 0) {
479 goto error;
480 }
481
482 error:
483 health_code_update();
484 pthread_mutex_unlock(socket->lock);
485 return ret;
486 }
This page took 0.041686 seconds and 5 git commands to generate.