2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
32 static char *create_channel_path(struct consumer_output
*consumer
,
36 char tmp_path
[PATH_MAX
];
37 char *pathname
= NULL
;
41 /* Get the right path name destination */
42 if (consumer
->type
== CONSUMER_DST_LOCAL
) {
43 /* Set application path to the destination path */
44 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s",
45 consumer
->dst
.trace_path
, consumer
->subdir
);
47 PERROR("snprintf kernel channel path");
50 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
52 PERROR("lttng_strndup");
56 /* Create directory */
57 ret
= run_as_mkdir_recursive(pathname
, S_IRWXU
| S_IRWXG
, uid
, gid
);
59 if (errno
!= EEXIST
) {
60 ERR("Trace directory creation error");
64 DBG3("Kernel local consumer tracefile path: %s", pathname
);
66 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s", consumer
->subdir
);
68 PERROR("snprintf kernel metadata path");
71 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
73 PERROR("lttng_strndup");
76 DBG3("Kernel network consumer subdir path: %s", pathname
);
87 * Sending a single channel to the consumer with command ADD_CHANNEL.
89 int kernel_consumer_add_channel(struct consumer_socket
*sock
,
90 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*session
,
95 struct lttcomm_consumer_msg lkm
;
96 struct consumer_output
*consumer
;
101 assert(session
->consumer
);
103 consumer
= session
->consumer
;
105 DBG("Kernel consumer adding channel %s to kernel consumer",
106 channel
->channel
->name
);
109 pathname
= create_channel_path(consumer
, session
->uid
, session
->gid
);
112 pathname
= strdup("");
119 /* Prep channel message structure */
120 consumer_init_channel_comm_msg(&lkm
,
121 LTTNG_CONSUMER_ADD_CHANNEL
,
127 consumer
->net_seq_index
,
128 channel
->channel
->name
,
129 channel
->stream_count
,
130 channel
->channel
->attr
.output
,
131 CONSUMER_CHANNEL_TYPE_DATA
,
132 channel
->channel
->attr
.tracefile_size
,
133 channel
->channel
->attr
.tracefile_count
,
135 channel
->channel
->attr
.live_timer_interval
,
136 session
->is_live_session
);
138 health_code_update();
140 ret
= consumer_send_channel(sock
, &lkm
);
145 health_code_update();
153 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
155 * The consumer socket lock must be held by the caller.
157 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
158 struct ltt_kernel_session
*session
, unsigned int monitor
)
162 struct lttcomm_consumer_msg lkm
;
163 struct consumer_output
*consumer
;
167 assert(session
->consumer
);
170 DBG("Sending metadata %d to kernel consumer", session
->metadata_stream_fd
);
172 /* Get consumer output pointer */
173 consumer
= session
->consumer
;
176 pathname
= create_channel_path(consumer
, session
->uid
, session
->gid
);
179 pathname
= strdup("");
186 /* Prep channel message structure */
187 consumer_init_channel_comm_msg(&lkm
,
188 LTTNG_CONSUMER_ADD_CHANNEL
,
189 session
->metadata
->fd
,
194 consumer
->net_seq_index
,
195 session
->metadata
->conf
->name
,
197 session
->metadata
->conf
->attr
.output
,
198 CONSUMER_CHANNEL_TYPE_METADATA
,
199 session
->metadata
->conf
->attr
.tracefile_size
,
200 session
->metadata
->conf
->attr
.tracefile_count
,
202 session
->metadata
->conf
->attr
.live_timer_interval
,
203 session
->is_live_session
);
205 health_code_update();
207 ret
= consumer_send_channel(sock
, &lkm
);
212 health_code_update();
214 /* Prep stream message structure */
215 consumer_init_stream_comm_msg(&lkm
,
216 LTTNG_CONSUMER_ADD_STREAM
,
217 session
->metadata
->fd
,
218 session
->metadata_stream_fd
,
219 0); /* CPU: 0 for metadata. */
221 health_code_update();
223 /* Send stream and file descriptor */
224 ret
= consumer_send_stream(sock
, consumer
, &lkm
,
225 &session
->metadata_stream_fd
, 1);
230 health_code_update();
238 * Sending a single stream to the consumer with command ADD_STREAM.
240 int kernel_consumer_add_stream(struct consumer_socket
*sock
,
241 struct ltt_kernel_channel
*channel
, struct ltt_kernel_stream
*stream
,
242 struct ltt_kernel_session
*session
, unsigned int monitor
)
245 struct lttcomm_consumer_msg lkm
;
246 struct consumer_output
*consumer
;
251 assert(session
->consumer
);
254 DBG("Sending stream %d of channel %s to kernel consumer",
255 stream
->fd
, channel
->channel
->name
);
257 /* Get consumer output pointer */
258 consumer
= session
->consumer
;
260 /* Prep stream consumer message */
261 consumer_init_stream_comm_msg(&lkm
,
262 LTTNG_CONSUMER_ADD_STREAM
,
267 health_code_update();
269 /* Send stream and file descriptor */
270 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
275 health_code_update();
282 * Sending the notification that all streams were sent with STREAMS_SENT.
284 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
285 struct ltt_kernel_session
*session
, uint64_t channel_key
)
288 struct lttcomm_consumer_msg lkm
;
289 struct consumer_output
*consumer
;
294 DBG("Sending streams_sent");
295 /* Get consumer output pointer */
296 consumer
= session
->consumer
;
298 /* Prep stream consumer message */
299 consumer_init_streams_sent_comm_msg(&lkm
,
300 LTTNG_CONSUMER_STREAMS_SENT
,
301 channel_key
, consumer
->net_seq_index
);
303 health_code_update();
305 /* Send stream and file descriptor */
306 ret
= consumer_send_msg(sock
, &lkm
);
316 * Send all stream fds of kernel channel to the consumer.
318 * The consumer socket lock must be held by the caller.
320 int kernel_consumer_send_channel_stream(struct consumer_socket
*sock
,
321 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*session
,
322 unsigned int monitor
)
325 struct ltt_kernel_stream
*stream
;
330 assert(session
->consumer
);
333 /* Bail out if consumer is disabled */
334 if (!session
->consumer
->enabled
) {
339 DBG("Sending streams of channel %s to kernel consumer",
340 channel
->channel
->name
);
342 if (!channel
->sent_to_consumer
) {
343 ret
= kernel_consumer_add_channel(sock
, channel
, session
, monitor
);
347 channel
->sent_to_consumer
= true;
351 cds_list_for_each_entry(stream
, &channel
->stream_list
.head
, list
) {
352 if (!stream
->fd
|| stream
->sent_to_consumer
) {
356 /* Add stream on the kernel consumer side. */
357 ret
= kernel_consumer_add_stream(sock
, channel
, stream
, session
,
362 stream
->sent_to_consumer
= true;
370 * Send all stream fds of the kernel session to the consumer.
372 * The consumer socket lock must be held by the caller.
374 int kernel_consumer_send_session(struct consumer_socket
*sock
,
375 struct ltt_kernel_session
*session
)
377 int ret
, monitor
= 0;
378 struct ltt_kernel_channel
*chan
;
382 assert(session
->consumer
);
385 /* Bail out if consumer is disabled */
386 if (!session
->consumer
->enabled
) {
391 /* Don't monitor the streams on the consumer if in flight recorder. */
392 if (session
->output_traces
) {
396 DBG("Sending session stream to kernel consumer");
398 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
399 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
405 /* Send channel and streams of it */
406 cds_list_for_each_entry(chan
, &session
->channel_list
.head
, list
) {
407 ret
= kernel_consumer_send_channel_stream(sock
, chan
, session
,
414 * Inform the relay that all the streams for the
417 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->fd
);
424 DBG("Kernel consumer FDs of metadata and channel streams sent");
426 session
->consumer_fds_sent
= 1;
433 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
434 struct ltt_kernel_channel
*channel
)
437 struct lttcomm_consumer_msg msg
;
442 DBG("Sending kernel consumer destroy channel key %d", channel
->fd
);
444 memset(&msg
, 0, sizeof(msg
));
445 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
446 msg
.u
.destroy_channel
.key
= channel
->fd
;
448 pthread_mutex_lock(socket
->lock
);
449 health_code_update();
451 ret
= consumer_send_msg(socket
, &msg
);
457 health_code_update();
458 pthread_mutex_unlock(socket
->lock
);
462 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
463 struct ltt_kernel_metadata
*metadata
)
466 struct lttcomm_consumer_msg msg
;
471 DBG("Sending kernel consumer destroy channel key %d", metadata
->fd
);
473 memset(&msg
, 0, sizeof(msg
));
474 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
475 msg
.u
.destroy_channel
.key
= metadata
->fd
;
477 pthread_mutex_lock(socket
->lock
);
478 health_code_update();
480 ret
= consumer_send_msg(socket
, &msg
);
486 health_code_update();
487 pthread_mutex_unlock(socket
->lock
);