2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
15 #include <common/common.hpp>
16 #include <common/defaults.hpp>
17 #include <common/compat/string.hpp>
19 #include "consumer.hpp"
20 #include "health-sessiond.hpp"
21 #include "kernel-consumer.hpp"
22 #include "notification-thread-commands.hpp"
23 #include "session.hpp"
24 #include "lttng-sessiond.hpp"
26 static char *create_channel_path(struct consumer_output
*consumer
,
27 size_t *consumer_path_offset
)
30 char tmp_path
[PATH_MAX
];
31 char *pathname
= NULL
;
33 LTTNG_ASSERT(consumer
);
35 /* Get the right path name destination */
36 if (consumer
->type
== CONSUMER_DST_LOCAL
||
37 (consumer
->type
== CONSUMER_DST_NET
&&
38 consumer
->relay_major_version
== 2 &&
39 consumer
->relay_minor_version
>= 11)) {
40 pathname
= strdup(consumer
->domain_subdir
);
42 PERROR("Failed to copy domain subdirectory string %s",
43 consumer
->domain_subdir
);
46 *consumer_path_offset
= strlen(consumer
->domain_subdir
);
47 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
50 /* Network output, relayd < 2.11. */
51 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s",
52 consumer
->dst
.net
.base_dir
,
53 consumer
->domain_subdir
);
55 PERROR("snprintf kernel metadata path");
57 } else if (ret
>= sizeof(tmp_path
)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
59 sizeof(tmp_path
), ret
,
60 consumer
->dst
.net
.base_dir
,
61 consumer
->domain_subdir
);
64 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
66 PERROR("lttng_strndup");
69 *consumer_path_offset
= 0;
70 DBG3("Kernel network consumer subdir path: %s", pathname
);
81 * Sending a single channel to the consumer with command ADD_CHANNEL.
84 int kernel_consumer_add_channel(struct consumer_socket
*sock
,
85 struct ltt_kernel_channel
*channel
,
86 struct ltt_kernel_session
*ksession
,
90 char *pathname
= NULL
;
91 struct lttcomm_consumer_msg lkm
;
92 struct consumer_output
*consumer
;
93 enum lttng_error_code status
;
94 struct ltt_session
*session
= NULL
;
95 struct lttng_channel_extended
*channel_attr_extended
;
97 size_t consumer_path_offset
= 0;
100 LTTNG_ASSERT(channel
);
101 LTTNG_ASSERT(ksession
);
102 LTTNG_ASSERT(ksession
->consumer
);
104 consumer
= ksession
->consumer
;
105 channel_attr_extended
= (struct lttng_channel_extended
*)
106 channel
->channel
->attr
.extended
.ptr
;
108 DBG("Kernel consumer adding channel %s to kernel consumer",
109 channel
->channel
->name
);
110 is_local_trace
= consumer
->net_seq_index
== -1ULL;
112 pathname
= create_channel_path(consumer
, &consumer_path_offset
);
118 if (is_local_trace
&& ksession
->current_trace_chunk
) {
119 enum lttng_trace_chunk_status chunk_status
;
120 char *pathname_index
;
122 ret
= asprintf(&pathname_index
, "%s/" DEFAULT_INDEX_DIR
,
125 ERR("Failed to format channel index directory");
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
134 chunk_status
= lttng_trace_chunk_create_subdirectory(
135 ksession
->current_trace_chunk
, pathname_index
);
136 free(pathname_index
);
137 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
143 /* Prep channel message structure */
144 consumer_init_add_channel_comm_msg(&lkm
, channel
->key
, ksession
->id
,
145 &pathname
[consumer_path_offset
], consumer
->net_seq_index
,
146 channel
->channel
->name
, channel
->stream_count
,
147 channel
->channel
->attr
.output
, CONSUMER_CHANNEL_TYPE_DATA
,
148 channel
->channel
->attr
.tracefile_size
,
149 channel
->channel
->attr
.tracefile_count
, monitor
,
150 channel
->channel
->attr
.live_timer_interval
, ksession
->is_live_session
,
151 channel_attr_extended
->monitor_timer_interval
,
152 ksession
->current_trace_chunk
, *ksession
->trace_format
);
154 health_code_update();
156 ret
= consumer_send_channel(sock
, &lkm
);
161 health_code_update();
163 session
= session_find_by_id(ksession
->id
);
164 LTTNG_ASSERT(session
);
165 ASSERT_LOCKED(session
->lock
);
166 ASSERT_SESSION_LIST_LOCKED();
168 status
= notification_thread_command_add_channel(the_notification_thread_handle
,
169 session
->id
, channel
->channel
->name
, channel
->key
, LTTNG_DOMAIN_KERNEL
,
170 channel
->channel
->attr
.subbuf_size
* channel
->channel
->attr
.num_subbuf
);
172 if (status
!= LTTNG_OK
) {
177 channel
->published_to_notification_thread
= true;
181 session_put(session
);
188 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
190 * The consumer socket lock must be held by the caller.
192 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
193 struct ltt_kernel_session
*ksession
, unsigned int monitor
)
196 struct lttcomm_consumer_msg lkm
;
197 struct consumer_output
*consumer
;
202 LTTNG_ASSERT(ksession
);
203 LTTNG_ASSERT(ksession
->consumer
);
206 DBG("Sending metadata %d to kernel consumer",
207 ksession
->metadata_stream_fd
);
209 /* Get consumer output pointer */
210 consumer
= ksession
->consumer
;
212 /* Prep channel message structure */
213 consumer_init_add_channel_comm_msg(&lkm
, ksession
->metadata
->key
, ksession
->id
, "",
214 consumer
->net_seq_index
, ksession
->metadata
->conf
->name
, 1,
215 ksession
->metadata
->conf
->attr
.output
, CONSUMER_CHANNEL_TYPE_METADATA
,
216 ksession
->metadata
->conf
->attr
.tracefile_size
,
217 ksession
->metadata
->conf
->attr
.tracefile_count
, monitor
,
218 ksession
->metadata
->conf
->attr
.live_timer_interval
,
219 ksession
->is_live_session
, 0, ksession
->current_trace_chunk
,
220 *ksession
->trace_format
);
222 health_code_update();
224 ret
= consumer_send_channel(sock
, &lkm
);
229 health_code_update();
231 /* Prep stream message structure */
232 consumer_init_add_stream_comm_msg(&lkm
,
233 ksession
->metadata
->key
,
234 ksession
->metadata_stream_fd
,
235 0 /* CPU: 0 for metadata. */);
237 health_code_update();
239 /* Send stream and file descriptor */
240 ret
= consumer_send_stream(sock
, consumer
, &lkm
,
241 &ksession
->metadata_stream_fd
, 1);
246 health_code_update();
254 * Sending a single stream to the consumer with command ADD_STREAM.
257 int kernel_consumer_add_stream(struct consumer_socket
*sock
,
258 struct ltt_kernel_channel
*channel
,
259 struct ltt_kernel_stream
*stream
,
260 struct ltt_kernel_session
*session
)
263 struct lttcomm_consumer_msg lkm
;
264 struct consumer_output
*consumer
;
266 LTTNG_ASSERT(channel
);
267 LTTNG_ASSERT(stream
);
268 LTTNG_ASSERT(session
);
269 LTTNG_ASSERT(session
->consumer
);
272 DBG("Sending stream %d of channel %s to kernel consumer",
273 stream
->fd
, channel
->channel
->name
);
275 /* Get consumer output pointer */
276 consumer
= session
->consumer
;
278 /* Prep stream consumer message */
279 consumer_init_add_stream_comm_msg(&lkm
,
284 health_code_update();
286 /* Send stream and file descriptor */
287 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
292 health_code_update();
299 * Sending the notification that all streams were sent with STREAMS_SENT.
301 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
302 struct ltt_kernel_session
*session
, uint64_t channel_key
)
305 struct lttcomm_consumer_msg lkm
;
306 struct consumer_output
*consumer
;
309 LTTNG_ASSERT(session
);
311 DBG("Sending streams_sent");
312 /* Get consumer output pointer */
313 consumer
= session
->consumer
;
315 /* Prep stream consumer message */
316 consumer_init_streams_sent_comm_msg(&lkm
,
317 LTTNG_CONSUMER_STREAMS_SENT
,
318 channel_key
, consumer
->net_seq_index
);
320 health_code_update();
322 /* Send stream and file descriptor */
323 ret
= consumer_send_msg(sock
, &lkm
);
333 * Send all stream fds of kernel channel to the consumer.
335 * The consumer socket lock must be held by the caller.
337 int kernel_consumer_send_channel_streams(struct consumer_socket
*sock
,
338 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*ksession
,
339 unsigned int monitor
)
342 struct ltt_kernel_stream
*stream
;
345 LTTNG_ASSERT(channel
);
346 LTTNG_ASSERT(ksession
);
347 LTTNG_ASSERT(ksession
->consumer
);
352 /* Bail out if consumer is disabled */
353 if (!ksession
->consumer
->enabled
) {
358 DBG("Sending streams of channel %s to kernel consumer",
359 channel
->channel
->name
);
361 if (!channel
->sent_to_consumer
) {
362 ret
= kernel_consumer_add_channel(sock
, channel
, ksession
, monitor
);
366 channel
->sent_to_consumer
= true;
370 cds_list_for_each_entry(stream
, &channel
->stream_list
.head
, list
) {
371 if (!stream
->fd
|| stream
->sent_to_consumer
) {
375 /* Add stream on the kernel consumer side. */
376 ret
= kernel_consumer_add_stream(sock
, channel
, stream
,
381 stream
->sent_to_consumer
= true;
390 * Send all stream fds of the kernel session to the consumer.
392 * The consumer socket lock must be held by the caller.
394 int kernel_consumer_send_session(struct consumer_socket
*sock
,
395 struct ltt_kernel_session
*session
)
397 int ret
, monitor
= 0;
398 struct ltt_kernel_channel
*chan
;
401 LTTNG_ASSERT(session
);
402 LTTNG_ASSERT(session
->consumer
);
405 /* Bail out if consumer is disabled */
406 if (!session
->consumer
->enabled
) {
411 /* Don't monitor the streams on the consumer if in flight recorder. */
412 if (session
->output_traces
) {
416 DBG("Sending session stream to kernel consumer");
418 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
419 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
425 /* Send channel and streams of it */
426 cds_list_for_each_entry(chan
, &session
->channel_list
.head
, list
) {
427 ret
= kernel_consumer_send_channel_streams(sock
, chan
, session
,
434 * Inform the relay that all the streams for the
437 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->key
);
444 DBG("Kernel consumer FDs of metadata and channel streams sent");
446 session
->consumer_fds_sent
= 1;
453 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
454 struct ltt_kernel_channel
*channel
)
457 struct lttcomm_consumer_msg msg
;
459 LTTNG_ASSERT(channel
);
460 LTTNG_ASSERT(socket
);
462 DBG("Sending kernel consumer destroy channel key %" PRIu64
, channel
->key
);
464 memset(&msg
, 0, sizeof(msg
));
465 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
466 msg
.u
.destroy_channel
.key
= channel
->key
;
468 pthread_mutex_lock(socket
->lock
);
469 health_code_update();
471 ret
= consumer_send_msg(socket
, &msg
);
477 health_code_update();
478 pthread_mutex_unlock(socket
->lock
);
482 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
483 struct ltt_kernel_metadata
*metadata
)
486 struct lttcomm_consumer_msg msg
;
488 LTTNG_ASSERT(metadata
);
489 LTTNG_ASSERT(socket
);
491 DBG("Sending kernel consumer destroy channel key %" PRIu64
, metadata
->key
);
493 memset(&msg
, 0, sizeof(msg
));
494 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
495 msg
.u
.destroy_channel
.key
= metadata
->key
;
497 pthread_mutex_lock(socket
->lock
);
498 health_code_update();
500 ret
= consumer_send_msg(socket
, &msg
);
506 health_code_update();
507 pthread_mutex_unlock(socket
->lock
);