Document consumer socket locking assumptions
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
23
24#include <common/common.h>
25#include <common/defaults.h>
f5436bfc 26#include <common/compat/string.h>
f1e16794 27
00e2e675 28#include "consumer.h"
8782cc74 29#include "health-sessiond.h"
f1e16794
DG
30#include "kernel-consumer.h"
31
2bba9e53
DG
32static char *create_channel_path(struct consumer_output *consumer,
33 uid_t uid, gid_t gid)
00e2e675
DG
34{
35 int ret;
ffe60014 36 char tmp_path[PATH_MAX];
2bba9e53 37 char *pathname = NULL;
00e2e675 38
2bba9e53 39 assert(consumer);
00e2e675 40
ffe60014
DG
41 /* Get the right path name destination */
42 if (consumer->type == CONSUMER_DST_LOCAL) {
43 /* Set application path to the destination path */
dec56f6c 44 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
ffe60014
DG
45 consumer->dst.trace_path, consumer->subdir);
46 if (ret < 0) {
2bba9e53 47 PERROR("snprintf kernel channel path");
ffe60014
DG
48 goto error;
49 }
f5436bfc 50 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 51 if (!pathname) {
f5436bfc 52 PERROR("lttng_strndup");
bb3c4e70
MD
53 goto error;
54 }
ffe60014
DG
55
56 /* Create directory */
2bba9e53 57 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
ffe60014 58 if (ret < 0) {
df5b86c8 59 if (errno != EEXIST) {
ffe60014
DG
60 ERR("Trace directory creation error");
61 goto error;
62 }
63 }
64 DBG3("Kernel local consumer tracefile path: %s", pathname);
65 } else {
66 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
67 if (ret < 0) {
2bba9e53 68 PERROR("snprintf kernel metadata path");
ffe60014
DG
69 goto error;
70 }
f5436bfc 71 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 72 if (!pathname) {
f5436bfc 73 PERROR("lttng_strndup");
bb3c4e70
MD
74 goto error;
75 }
ffe60014
DG
76 DBG3("Kernel network consumer subdir path: %s", pathname);
77 }
78
2bba9e53
DG
79 return pathname;
80
81error:
82 free(pathname);
83 return NULL;
84}
85
86/*
87 * Sending a single channel to the consumer with command ADD_CHANNEL.
88 */
89int kernel_consumer_add_channel(struct consumer_socket *sock,
90 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
91 unsigned int monitor)
92{
93 int ret;
94 char *pathname;
95 struct lttcomm_consumer_msg lkm;
96 struct consumer_output *consumer;
97
98 /* Safety net */
99 assert(channel);
100 assert(session);
101 assert(session->consumer);
102
103 consumer = session->consumer;
104
105 DBG("Kernel consumer adding channel %s to kernel consumer",
106 channel->channel->name);
107
108 if (monitor) {
109 pathname = create_channel_path(consumer, session->uid, session->gid);
2bba9e53
DG
110 } else {
111 /* Empty path. */
53efb85a 112 pathname = strdup("");
2bba9e53 113 }
bb3c4e70
MD
114 if (!pathname) {
115 ret = -1;
116 goto error;
117 }
2bba9e53 118
00e2e675
DG
119 /* Prep channel message structure */
120 consumer_init_channel_comm_msg(&lkm,
121 LTTNG_CONSUMER_ADD_CHANNEL,
122 channel->fd,
ffe60014
DG
123 session->id,
124 pathname,
125 session->uid,
126 session->gid,
127 consumer->net_seq_index,
c30aaa51 128 channel->channel->name,
ffe60014
DG
129 channel->stream_count,
130 channel->channel->attr.output,
1624d5b7
JD
131 CONSUMER_CHANNEL_TYPE_DATA,
132 channel->channel->attr.tracefile_size,
2bba9e53 133 channel->channel->attr.tracefile_count,
ecc48a90
JD
134 monitor,
135 channel->channel->attr.live_timer_interval);
00e2e675 136
840cb59c 137 health_code_update();
ca03de58 138
00e2e675
DG
139 ret = consumer_send_channel(sock, &lkm);
140 if (ret < 0) {
141 goto error;
142 }
143
840cb59c 144 health_code_update();
ca03de58 145
00e2e675 146error:
53efb85a 147 free(pathname);
00e2e675
DG
148 return ret;
149}
150
151/*
152 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
ba0fd12e
JG
153 *
154 * The consumer socket lock must be held by the caller.
00e2e675 155 */
f50f23d9 156int kernel_consumer_add_metadata(struct consumer_socket *sock,
2bba9e53 157 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
158{
159 int ret;
2bba9e53 160 char *pathname;
00e2e675 161 struct lttcomm_consumer_msg lkm;
a7d9a3e7 162 struct consumer_output *consumer;
00e2e675
DG
163
164 /* Safety net */
165 assert(session);
166 assert(session->consumer);
f50f23d9 167 assert(sock);
00e2e675
DG
168
169 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
170
171 /* Get consumer output pointer */
a7d9a3e7 172 consumer = session->consumer;
00e2e675 173
2bba9e53
DG
174 if (monitor) {
175 pathname = create_channel_path(consumer, session->uid, session->gid);
00e2e675 176 } else {
2bba9e53 177 /* Empty path. */
53efb85a 178 pathname = strdup("");
00e2e675 179 }
bb3c4e70
MD
180 if (!pathname) {
181 ret = -1;
182 goto error;
183 }
00e2e675
DG
184
185 /* Prep channel message structure */
186 consumer_init_channel_comm_msg(&lkm,
187 LTTNG_CONSUMER_ADD_CHANNEL,
188 session->metadata->fd,
ffe60014
DG
189 session->id,
190 pathname,
191 session->uid,
192 session->gid,
193 consumer->net_seq_index,
30079b6b 194 DEFAULT_METADATA_NAME,
ffe60014
DG
195 1,
196 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 197 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 198 0, 0,
ecc48a90 199 monitor, 0);
00e2e675 200
840cb59c 201 health_code_update();
ca03de58 202
00e2e675
DG
203 ret = consumer_send_channel(sock, &lkm);
204 if (ret < 0) {
205 goto error;
206 }
207
840cb59c 208 health_code_update();
ca03de58 209
00e2e675
DG
210 /* Prep stream message structure */
211 consumer_init_stream_comm_msg(&lkm,
212 LTTNG_CONSUMER_ADD_STREAM,
213 session->metadata->fd,
214 session->metadata_stream_fd,
1624d5b7 215 0); /* CPU: 0 for metadata. */
00e2e675 216
840cb59c 217 health_code_update();
ca03de58 218
00e2e675 219 /* Send stream and file descriptor */
a7d9a3e7 220 ret = consumer_send_stream(sock, consumer, &lkm,
00e2e675
DG
221 &session->metadata_stream_fd, 1);
222 if (ret < 0) {
223 goto error;
224 }
225
840cb59c 226 health_code_update();
ca03de58 227
00e2e675 228error:
53efb85a 229 free(pathname);
00e2e675
DG
230 return ret;
231}
232
233/*
234 * Sending a single stream to the consumer with command ADD_STREAM.
235 */
f50f23d9
DG
236int kernel_consumer_add_stream(struct consumer_socket *sock,
237 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
2bba9e53 238 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
239{
240 int ret;
00e2e675 241 struct lttcomm_consumer_msg lkm;
a7d9a3e7 242 struct consumer_output *consumer;
00e2e675
DG
243
244 assert(channel);
245 assert(stream);
246 assert(session);
247 assert(session->consumer);
f50f23d9 248 assert(sock);
00e2e675
DG
249
250 DBG("Sending stream %d of channel %s to kernel consumer",
251 stream->fd, channel->channel->name);
252
253 /* Get consumer output pointer */
a7d9a3e7 254 consumer = session->consumer;
00e2e675 255
00e2e675 256 /* Prep stream consumer message */
ffe60014
DG
257 consumer_init_stream_comm_msg(&lkm,
258 LTTNG_CONSUMER_ADD_STREAM,
00e2e675
DG
259 channel->fd,
260 stream->fd,
ffe60014 261 stream->cpu);
00e2e675 262
840cb59c 263 health_code_update();
ca03de58 264
00e2e675 265 /* Send stream and file descriptor */
a7d9a3e7 266 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
267 if (ret < 0) {
268 goto error;
269 }
270
840cb59c 271 health_code_update();
ca03de58 272
00e2e675
DG
273error:
274 return ret;
275}
276
a4baae1b
JD
277/*
278 * Sending the notification that all streams were sent with STREAMS_SENT.
279 */
280int kernel_consumer_streams_sent(struct consumer_socket *sock,
281 struct ltt_kernel_session *session, uint64_t channel_key)
282{
283 int ret;
284 struct lttcomm_consumer_msg lkm;
285 struct consumer_output *consumer;
286
287 assert(sock);
288 assert(session);
289
290 DBG("Sending streams_sent");
291 /* Get consumer output pointer */
292 consumer = session->consumer;
293
294 /* Prep stream consumer message */
295 consumer_init_streams_sent_comm_msg(&lkm,
296 LTTNG_CONSUMER_STREAMS_SENT,
297 channel_key, consumer->net_seq_index);
298
299 health_code_update();
300
301 /* Send stream and file descriptor */
302 ret = consumer_send_msg(sock, &lkm);
303 if (ret < 0) {
304 goto error;
305 }
306
307error:
308 return ret;
309}
310
f1e16794
DG
311/*
312 * Send all stream fds of kernel channel to the consumer.
ba0fd12e
JG
313 *
314 * The consumer socket lock must be held by the caller.
f1e16794 315 */
f50f23d9 316int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
2bba9e53
DG
317 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
318 unsigned int monitor)
f1e16794 319{
6c598487 320 int ret = LTTNG_OK;
f1e16794 321 struct ltt_kernel_stream *stream;
00e2e675
DG
322
323 /* Safety net */
324 assert(channel);
325 assert(session);
326 assert(session->consumer);
f50f23d9 327 assert(sock);
00e2e675
DG
328
329 /* Bail out if consumer is disabled */
330 if (!session->consumer->enabled) {
f73fabfd 331 ret = LTTNG_OK;
00e2e675
DG
332 goto error;
333 }
f1e16794
DG
334
335 DBG("Sending streams of channel %s to kernel consumer",
336 channel->channel->name);
337
6c598487
MD
338 if (!channel->sent_to_consumer) {
339 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
340 if (ret < 0) {
341 goto error;
342 }
343 channel->sent_to_consumer = true;
f1e16794
DG
344 }
345
346 /* Send streams */
347 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
b64dd54f 348 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
349 continue;
350 }
00e2e675
DG
351
352 /* Add stream on the kernel consumer side. */
2bba9e53
DG
353 ret = kernel_consumer_add_stream(sock, channel, stream, session,
354 monitor);
f1e16794 355 if (ret < 0) {
f1e16794
DG
356 goto error;
357 }
b64dd54f 358 stream->sent_to_consumer = true;
f1e16794
DG
359 }
360
f1e16794
DG
361error:
362 return ret;
363}
364
365/*
366 * Send all stream fds of the kernel session to the consumer.
ba0fd12e
JG
367 *
368 * The consumer socket lock must be held by the caller.
f1e16794 369 */
f50f23d9
DG
370int kernel_consumer_send_session(struct consumer_socket *sock,
371 struct ltt_kernel_session *session)
f1e16794 372{
2bba9e53 373 int ret, monitor = 0;
f1e16794 374 struct ltt_kernel_channel *chan;
f1e16794 375
00e2e675
DG
376 /* Safety net */
377 assert(session);
378 assert(session->consumer);
f50f23d9 379 assert(sock);
f1e16794 380
00e2e675
DG
381 /* Bail out if consumer is disabled */
382 if (!session->consumer->enabled) {
f73fabfd 383 ret = LTTNG_OK;
00e2e675 384 goto error;
f1e16794
DG
385 }
386
2bba9e53
DG
387 /* Don't monitor the streams on the consumer if in flight recorder. */
388 if (session->output_traces) {
389 monitor = 1;
390 }
391
00e2e675
DG
392 DBG("Sending session stream to kernel consumer");
393
609af759 394 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 395 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 396 if (ret < 0) {
f1e16794
DG
397 goto error;
398 }
f1e16794
DG
399 }
400
00e2e675 401 /* Send channel and streams of it */
f1e16794 402 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
2bba9e53
DG
403 ret = kernel_consumer_send_channel_stream(sock, chan, session,
404 monitor);
f1e16794
DG
405 if (ret < 0) {
406 goto error;
407 }
601262d6
JD
408 if (monitor) {
409 /*
410 * Inform the relay that all the streams for the
411 * channel were sent.
412 */
413 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
414 if (ret < 0) {
415 goto error;
416 }
417 }
f1e16794
DG
418 }
419
00e2e675 420 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 421
4ce9ff51 422 session->consumer_fds_sent = 1;
f1e16794
DG
423 return 0;
424
425error:
426 return ret;
427}
07b86b52
JD
428
429int kernel_consumer_destroy_channel(struct consumer_socket *socket,
430 struct ltt_kernel_channel *channel)
431{
432 int ret;
433 struct lttcomm_consumer_msg msg;
434
435 assert(channel);
436 assert(socket);
07b86b52
JD
437
438 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
439
53efb85a 440 memset(&msg, 0, sizeof(msg));
07b86b52
JD
441 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
442 msg.u.destroy_channel.key = channel->fd;
443
444 pthread_mutex_lock(socket->lock);
445 health_code_update();
446
447 ret = consumer_send_msg(socket, &msg);
448 if (ret < 0) {
449 goto error;
450 }
451
452error:
453 health_code_update();
454 pthread_mutex_unlock(socket->lock);
455 return ret;
456}
457
458int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
459 struct ltt_kernel_metadata *metadata)
460{
461 int ret;
462 struct lttcomm_consumer_msg msg;
463
464 assert(metadata);
465 assert(socket);
07b86b52
JD
466
467 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
468
53efb85a 469 memset(&msg, 0, sizeof(msg));
07b86b52
JD
470 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
471 msg.u.destroy_channel.key = metadata->fd;
472
473 pthread_mutex_lock(socket->lock);
474 health_code_update();
475
476 ret = consumer_send_msg(socket, &msg);
477 if (ret < 0) {
478 goto error;
479 }
480
481error:
482 health_code_update();
483 pthread_mutex_unlock(socket->lock);
484 return ret;
485}
This page took 0.0777 seconds and 5 git commands to generate.