Propagate trace format all the way to the consumer
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 #include <inttypes.h>
14
15 #include <common/common.hpp>
16 #include <common/defaults.hpp>
17 #include <common/compat/string.hpp>
18
19 #include "consumer.hpp"
20 #include "health-sessiond.hpp"
21 #include "kernel-consumer.hpp"
22 #include "notification-thread-commands.hpp"
23 #include "session.hpp"
24 #include "lttng-sessiond.hpp"
25
26 static char *create_channel_path(struct consumer_output *consumer,
27 size_t *consumer_path_offset)
28 {
29 int ret;
30 char tmp_path[PATH_MAX];
31 char *pathname = NULL;
32
33 LTTNG_ASSERT(consumer);
34
35 /* Get the right path name destination */
36 if (consumer->type == CONSUMER_DST_LOCAL ||
37 (consumer->type == CONSUMER_DST_NET &&
38 consumer->relay_major_version == 2 &&
39 consumer->relay_minor_version >= 11)) {
40 pathname = strdup(consumer->domain_subdir);
41 if (!pathname) {
42 PERROR("Failed to copy domain subdirectory string %s",
43 consumer->domain_subdir);
44 goto error;
45 }
46 *consumer_path_offset = strlen(consumer->domain_subdir);
47 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
48 pathname);
49 } else {
50 /* Network output, relayd < 2.11. */
51 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
52 consumer->dst.net.base_dir,
53 consumer->domain_subdir);
54 if (ret < 0) {
55 PERROR("snprintf kernel metadata path");
56 goto error;
57 } else if (ret >= sizeof(tmp_path)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
59 sizeof(tmp_path), ret,
60 consumer->dst.net.base_dir,
61 consumer->domain_subdir);
62 goto error;
63 }
64 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
65 if (!pathname) {
66 PERROR("lttng_strndup");
67 goto error;
68 }
69 *consumer_path_offset = 0;
70 DBG3("Kernel network consumer subdir path: %s", pathname);
71 }
72
73 return pathname;
74
75 error:
76 free(pathname);
77 return NULL;
78 }
79
80 /*
81 * Sending a single channel to the consumer with command ADD_CHANNEL.
82 */
83 static
84 int kernel_consumer_add_channel(struct consumer_socket *sock,
85 struct ltt_kernel_channel *channel,
86 struct ltt_kernel_session *ksession,
87 unsigned int monitor)
88 {
89 int ret;
90 char *pathname = NULL;
91 struct lttcomm_consumer_msg lkm;
92 struct consumer_output *consumer;
93 enum lttng_error_code status;
94 struct ltt_session *session = NULL;
95 struct lttng_channel_extended *channel_attr_extended;
96 bool is_local_trace;
97 size_t consumer_path_offset = 0;
98
99 /* Safety net */
100 LTTNG_ASSERT(channel);
101 LTTNG_ASSERT(ksession);
102 LTTNG_ASSERT(ksession->consumer);
103
104 consumer = ksession->consumer;
105 channel_attr_extended = (struct lttng_channel_extended *)
106 channel->channel->attr.extended.ptr;
107
108 DBG("Kernel consumer adding channel %s to kernel consumer",
109 channel->channel->name);
110 is_local_trace = consumer->net_seq_index == -1ULL;
111
112 pathname = create_channel_path(consumer, &consumer_path_offset);
113 if (!pathname) {
114 ret = -1;
115 goto error;
116 }
117
118 if (is_local_trace && ksession->current_trace_chunk) {
119 enum lttng_trace_chunk_status chunk_status;
120 char *pathname_index;
121
122 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
123 pathname);
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
134 chunk_status = lttng_trace_chunk_create_subdirectory(
135 ksession->current_trace_chunk, pathname_index);
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
143 /* Prep channel message structure */
144 consumer_init_add_channel_comm_msg(&lkm, channel->key, ksession->id,
145 &pathname[consumer_path_offset], consumer->net_seq_index,
146 channel->channel->name, channel->stream_count,
147 channel->channel->attr.output, CONSUMER_CHANNEL_TYPE_DATA,
148 channel->channel->attr.tracefile_size,
149 channel->channel->attr.tracefile_count, monitor,
150 channel->channel->attr.live_timer_interval, ksession->is_live_session,
151 channel_attr_extended->monitor_timer_interval,
152 ksession->current_trace_chunk, *ksession->trace_format);
153
154 health_code_update();
155
156 ret = consumer_send_channel(sock, &lkm);
157 if (ret < 0) {
158 goto error;
159 }
160
161 health_code_update();
162 rcu_read_lock();
163 session = session_find_by_id(ksession->id);
164 LTTNG_ASSERT(session);
165 ASSERT_LOCKED(session->lock);
166 ASSERT_SESSION_LIST_LOCKED();
167
168 status = notification_thread_command_add_channel(the_notification_thread_handle,
169 session->id, channel->channel->name, channel->key, LTTNG_DOMAIN_KERNEL,
170 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
171 rcu_read_unlock();
172 if (status != LTTNG_OK) {
173 ret = -1;
174 goto error;
175 }
176
177 channel->published_to_notification_thread = true;
178
179 error:
180 if (session) {
181 session_put(session);
182 }
183 free(pathname);
184 return ret;
185 }
186
187 /*
188 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
189 *
190 * The consumer socket lock must be held by the caller.
191 */
192 int kernel_consumer_add_metadata(struct consumer_socket *sock,
193 struct ltt_kernel_session *ksession, unsigned int monitor)
194 {
195 int ret;
196 struct lttcomm_consumer_msg lkm;
197 struct consumer_output *consumer;
198
199 rcu_read_lock();
200
201 /* Safety net */
202 LTTNG_ASSERT(ksession);
203 LTTNG_ASSERT(ksession->consumer);
204 LTTNG_ASSERT(sock);
205
206 DBG("Sending metadata %d to kernel consumer",
207 ksession->metadata_stream_fd);
208
209 /* Get consumer output pointer */
210 consumer = ksession->consumer;
211
212 /* Prep channel message structure */
213 consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key, ksession->id, "",
214 consumer->net_seq_index, ksession->metadata->conf->name, 1,
215 ksession->metadata->conf->attr.output, CONSUMER_CHANNEL_TYPE_METADATA,
216 ksession->metadata->conf->attr.tracefile_size,
217 ksession->metadata->conf->attr.tracefile_count, monitor,
218 ksession->metadata->conf->attr.live_timer_interval,
219 ksession->is_live_session, 0, ksession->current_trace_chunk,
220 *ksession->trace_format);
221
222 health_code_update();
223
224 ret = consumer_send_channel(sock, &lkm);
225 if (ret < 0) {
226 goto error;
227 }
228
229 health_code_update();
230
231 /* Prep stream message structure */
232 consumer_init_add_stream_comm_msg(&lkm,
233 ksession->metadata->key,
234 ksession->metadata_stream_fd,
235 0 /* CPU: 0 for metadata. */);
236
237 health_code_update();
238
239 /* Send stream and file descriptor */
240 ret = consumer_send_stream(sock, consumer, &lkm,
241 &ksession->metadata_stream_fd, 1);
242 if (ret < 0) {
243 goto error;
244 }
245
246 health_code_update();
247
248 error:
249 rcu_read_unlock();
250 return ret;
251 }
252
253 /*
254 * Sending a single stream to the consumer with command ADD_STREAM.
255 */
256 static
257 int kernel_consumer_add_stream(struct consumer_socket *sock,
258 struct ltt_kernel_channel *channel,
259 struct ltt_kernel_stream *stream,
260 struct ltt_kernel_session *session)
261 {
262 int ret;
263 struct lttcomm_consumer_msg lkm;
264 struct consumer_output *consumer;
265
266 LTTNG_ASSERT(channel);
267 LTTNG_ASSERT(stream);
268 LTTNG_ASSERT(session);
269 LTTNG_ASSERT(session->consumer);
270 LTTNG_ASSERT(sock);
271
272 DBG("Sending stream %d of channel %s to kernel consumer",
273 stream->fd, channel->channel->name);
274
275 /* Get consumer output pointer */
276 consumer = session->consumer;
277
278 /* Prep stream consumer message */
279 consumer_init_add_stream_comm_msg(&lkm,
280 channel->key,
281 stream->fd,
282 stream->cpu);
283
284 health_code_update();
285
286 /* Send stream and file descriptor */
287 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
288 if (ret < 0) {
289 goto error;
290 }
291
292 health_code_update();
293
294 error:
295 return ret;
296 }
297
298 /*
299 * Sending the notification that all streams were sent with STREAMS_SENT.
300 */
301 int kernel_consumer_streams_sent(struct consumer_socket *sock,
302 struct ltt_kernel_session *session, uint64_t channel_key)
303 {
304 int ret;
305 struct lttcomm_consumer_msg lkm;
306 struct consumer_output *consumer;
307
308 LTTNG_ASSERT(sock);
309 LTTNG_ASSERT(session);
310
311 DBG("Sending streams_sent");
312 /* Get consumer output pointer */
313 consumer = session->consumer;
314
315 /* Prep stream consumer message */
316 consumer_init_streams_sent_comm_msg(&lkm,
317 LTTNG_CONSUMER_STREAMS_SENT,
318 channel_key, consumer->net_seq_index);
319
320 health_code_update();
321
322 /* Send stream and file descriptor */
323 ret = consumer_send_msg(sock, &lkm);
324 if (ret < 0) {
325 goto error;
326 }
327
328 error:
329 return ret;
330 }
331
332 /*
333 * Send all stream fds of kernel channel to the consumer.
334 *
335 * The consumer socket lock must be held by the caller.
336 */
337 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
338 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
339 unsigned int monitor)
340 {
341 int ret = LTTNG_OK;
342 struct ltt_kernel_stream *stream;
343
344 /* Safety net */
345 LTTNG_ASSERT(channel);
346 LTTNG_ASSERT(ksession);
347 LTTNG_ASSERT(ksession->consumer);
348 LTTNG_ASSERT(sock);
349
350 rcu_read_lock();
351
352 /* Bail out if consumer is disabled */
353 if (!ksession->consumer->enabled) {
354 ret = LTTNG_OK;
355 goto error;
356 }
357
358 DBG("Sending streams of channel %s to kernel consumer",
359 channel->channel->name);
360
361 if (!channel->sent_to_consumer) {
362 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
363 if (ret < 0) {
364 goto error;
365 }
366 channel->sent_to_consumer = true;
367 }
368
369 /* Send streams */
370 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
371 if (!stream->fd || stream->sent_to_consumer) {
372 continue;
373 }
374
375 /* Add stream on the kernel consumer side. */
376 ret = kernel_consumer_add_stream(sock, channel, stream,
377 ksession);
378 if (ret < 0) {
379 goto error;
380 }
381 stream->sent_to_consumer = true;
382 }
383
384 error:
385 rcu_read_unlock();
386 return ret;
387 }
388
389 /*
390 * Send all stream fds of the kernel session to the consumer.
391 *
392 * The consumer socket lock must be held by the caller.
393 */
394 int kernel_consumer_send_session(struct consumer_socket *sock,
395 struct ltt_kernel_session *session)
396 {
397 int ret, monitor = 0;
398 struct ltt_kernel_channel *chan;
399
400 /* Safety net */
401 LTTNG_ASSERT(session);
402 LTTNG_ASSERT(session->consumer);
403 LTTNG_ASSERT(sock);
404
405 /* Bail out if consumer is disabled */
406 if (!session->consumer->enabled) {
407 ret = LTTNG_OK;
408 goto error;
409 }
410
411 /* Don't monitor the streams on the consumer if in flight recorder. */
412 if (session->output_traces) {
413 monitor = 1;
414 }
415
416 DBG("Sending session stream to kernel consumer");
417
418 if (session->metadata_stream_fd >= 0 && session->metadata) {
419 ret = kernel_consumer_add_metadata(sock, session, monitor);
420 if (ret < 0) {
421 goto error;
422 }
423 }
424
425 /* Send channel and streams of it */
426 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
427 ret = kernel_consumer_send_channel_streams(sock, chan, session,
428 monitor);
429 if (ret < 0) {
430 goto error;
431 }
432 if (monitor) {
433 /*
434 * Inform the relay that all the streams for the
435 * channel were sent.
436 */
437 ret = kernel_consumer_streams_sent(sock, session, chan->key);
438 if (ret < 0) {
439 goto error;
440 }
441 }
442 }
443
444 DBG("Kernel consumer FDs of metadata and channel streams sent");
445
446 session->consumer_fds_sent = 1;
447 return 0;
448
449 error:
450 return ret;
451 }
452
453 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
454 struct ltt_kernel_channel *channel)
455 {
456 int ret;
457 struct lttcomm_consumer_msg msg;
458
459 LTTNG_ASSERT(channel);
460 LTTNG_ASSERT(socket);
461
462 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
463
464 memset(&msg, 0, sizeof(msg));
465 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
466 msg.u.destroy_channel.key = channel->key;
467
468 pthread_mutex_lock(socket->lock);
469 health_code_update();
470
471 ret = consumer_send_msg(socket, &msg);
472 if (ret < 0) {
473 goto error;
474 }
475
476 error:
477 health_code_update();
478 pthread_mutex_unlock(socket->lock);
479 return ret;
480 }
481
482 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
483 struct ltt_kernel_metadata *metadata)
484 {
485 int ret;
486 struct lttcomm_consumer_msg msg;
487
488 LTTNG_ASSERT(metadata);
489 LTTNG_ASSERT(socket);
490
491 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
492
493 memset(&msg, 0, sizeof(msg));
494 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
495 msg.u.destroy_channel.key = metadata->key;
496
497 pthread_mutex_lock(socket->lock);
498 health_code_update();
499
500 ret = consumer_send_msg(socket, &msg);
501 if (ret < 0) {
502 goto error;
503 }
504
505 error:
506 health_code_update();
507 pthread_mutex_unlock(socket->lock);
508 return ret;
509 }
This page took 0.043098 seconds and 5 git commands to generate.