Initial import of the new binary lttng-relayd
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
3bd1e081
MD
21#include <poll.h>
22#include <pthread.h>
23#include <stdlib.h>
24#include <string.h>
25#include <sys/mman.h>
26#include <sys/socket.h>
27#include <sys/types.h>
28#include <unistd.h>
dbb5dfe6 29#include <sys/stat.h>
3bd1e081 30
990570ed 31#include <common/common.h>
10a8a223 32#include <common/kernel-ctl/kernel-ctl.h>
10a8a223 33#include <common/sessiond-comm/sessiond-comm.h>
dbb5dfe6 34#include <common/compat/fcntl.h>
0857097f 35
10a8a223 36#include "kernel-consumer.h"
3bd1e081
MD
37
38extern struct lttng_consumer_global_data consumer_data;
39extern int consumer_poll_timeout;
40extern volatile int consumer_quit;
41
42/*
43 * Mmap the ring buffer, read it and write the data to the tracefile.
44 *
45 * Returns the number of bytes written
46 */
4078b776 47ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
3bd1e081
MD
48 struct lttng_consumer_local_data *ctx,
49 struct lttng_consumer_stream *stream, unsigned long len)
50{
51 unsigned long mmap_offset;
47e81c02 52 ssize_t ret = 0, written = 0;
3bd1e081
MD
53 off_t orig_offset = stream->out_fd_offset;
54 int fd = stream->wait_fd;
55 int outfd = stream->out_fd;
56
57 /* get the offset inside the fd to mmap */
58 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
59 if (ret != 0) {
87dc6a9c 60 errno = -ret;
3bd1e081 61 perror("kernctl_get_mmap_read_offset");
47e81c02 62 written = ret;
3bd1e081
MD
63 goto end;
64 }
65
66 while (len > 0) {
67 ret = write(outfd, stream->mmap_base + mmap_offset, len);
47e81c02
MD
68 if (ret < 0) {
69 if (errno == EINTR) {
70 /* restart the interrupted system call */
71 continue;
72 } else {
73 perror("Error in file write");
74 if (written == 0) {
75 written = ret;
76 }
77 goto end;
78 }
79 } else if (ret > len) {
3bd1e081 80 perror("Error in file write");
47e81c02 81 written += ret;
3bd1e081 82 goto end;
47e81c02
MD
83 } else {
84 len -= ret;
85 mmap_offset += ret;
3bd1e081
MD
86 }
87 /* This won't block, but will start writeout asynchronously */
dbb5dfe6 88 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
3bd1e081
MD
89 SYNC_FILE_RANGE_WRITE);
90 stream->out_fd_offset += ret;
47e81c02 91 written += ret;
3bd1e081 92 }
3bd1e081 93 lttng_consumer_sync_trace_file(stream, orig_offset);
3bd1e081 94end:
47e81c02 95 return written;
3bd1e081
MD
96}
97
98/*
99 * Splice the data from the ring buffer to the tracefile.
100 *
101 * Returns the number of bytes spliced.
102 */
4078b776 103ssize_t lttng_kconsumer_on_read_subbuffer_splice(
3bd1e081
MD
104 struct lttng_consumer_local_data *ctx,
105 struct lttng_consumer_stream *stream, unsigned long len)
106{
47e81c02 107 ssize_t ret = 0, written = 0;
3bd1e081
MD
108 loff_t offset = 0;
109 off_t orig_offset = stream->out_fd_offset;
110 int fd = stream->wait_fd;
111 int outfd = stream->out_fd;
112
113 while (len > 0) {
114 DBG("splice chan to pipe offset %lu (fd : %d)",
115 (unsigned long)offset, fd);
116 ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
117 SPLICE_F_MOVE | SPLICE_F_MORE);
630543f2 118 DBG("splice chan to pipe ret %zd", ret);
3bd1e081 119 if (ret < 0) {
3bd1e081 120 perror("Error in relay splice");
47e81c02
MD
121 if (written == 0) {
122 written = ret;
123 }
124 ret = errno;
3bd1e081
MD
125 goto splice_error;
126 }
127
128 ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret,
129 SPLICE_F_MOVE | SPLICE_F_MORE);
630543f2 130 DBG("splice pipe to file %zd", ret);
3bd1e081 131 if (ret < 0) {
3bd1e081 132 perror("Error in file splice");
47e81c02
MD
133 if (written == 0) {
134 written = ret;
135 }
136 ret = errno;
137 goto splice_error;
138 }
139 if (ret > len) {
140 errno = EINVAL;
141 perror("Wrote more data than requested");
142 written += ret;
143 ret = errno;
3bd1e081
MD
144 goto splice_error;
145 }
146 len -= ret;
147 /* This won't block, but will start writeout asynchronously */
dbb5dfe6 148 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
3bd1e081
MD
149 SYNC_FILE_RANGE_WRITE);
150 stream->out_fd_offset += ret;
47e81c02 151 written += ret;
3bd1e081
MD
152 }
153 lttng_consumer_sync_trace_file(stream, orig_offset);
154
155 goto end;
156
157splice_error:
158 /* send the appropriate error description to sessiond */
47e81c02 159 switch (ret) {
3bd1e081
MD
160 case EBADF:
161 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
162 break;
163 case EINVAL:
164 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
165 break;
166 case ENOMEM:
167 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
168 break;
169 case ESPIPE:
170 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
171 break;
172 }
173
174end:
47e81c02 175 return written;
3bd1e081
MD
176}
177
178/*
179 * Take a snapshot for a specific fd
180 *
181 * Returns 0 on success, < 0 on error
182 */
183int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
184 struct lttng_consumer_stream *stream)
185{
186 int ret = 0;
187 int infd = stream->wait_fd;
188
189 ret = kernctl_snapshot(infd);
190 if (ret != 0) {
87dc6a9c 191 errno = -ret;
3bd1e081
MD
192 perror("Getting sub-buffer snapshot.");
193 }
194
195 return ret;
196}
197
198/*
199 * Get the produced position
200 *
201 * Returns 0 on success, < 0 on error
202 */
203int lttng_kconsumer_get_produced_snapshot(
204 struct lttng_consumer_local_data *ctx,
205 struct lttng_consumer_stream *stream,
206 unsigned long *pos)
207{
208 int ret;
209 int infd = stream->wait_fd;
210
211 ret = kernctl_snapshot_get_produced(infd, pos);
212 if (ret != 0) {
87dc6a9c 213 errno = -ret;
3bd1e081
MD
214 perror("kernctl_snapshot_get_produced");
215 }
216
217 return ret;
218}
219
220int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
221 int sock, struct pollfd *consumer_sockpoll)
222{
223 ssize_t ret;
224 struct lttcomm_consumer_msg msg;
225
226 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
227 if (ret != sizeof(msg)) {
f2fc6720 228 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
3bd1e081
MD
229 return ret;
230 }
231 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
232 return -ENOENT;
233 }
234
235 switch (msg.cmd_type) {
236 case LTTNG_CONSUMER_ADD_CHANNEL:
237 {
238 struct lttng_consumer_channel *new_channel;
239
240 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
241 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
242 -1, -1,
243 msg.u.channel.mmap_len,
244 msg.u.channel.max_sb_size);
245 if (new_channel == NULL) {
246 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
247 goto end_nosignal;
248 }
249 if (ctx->on_recv_channel != NULL) {
250 ret = ctx->on_recv_channel(new_channel);
251 if (ret == 0) {
252 consumer_add_channel(new_channel);
253 } else if (ret < 0) {
254 goto end_nosignal;
255 }
256 } else {
257 consumer_add_channel(new_channel);
258 }
259 goto end_nosignal;
260 }
261 case LTTNG_CONSUMER_ADD_STREAM:
262 {
263 struct lttng_consumer_stream *new_stream;
f2fc6720 264 int fd;
3bd1e081
MD
265
266 /* block */
267 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
268 return -EINTR;
269 }
f2fc6720
MD
270 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
271 if (ret != sizeof(fd)) {
3bd1e081
MD
272 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
273 return ret;
274 }
3bd1e081 275
f2fc6720
MD
276 DBG("consumer_add_stream %s (%d)", msg.u.stream.path_name,
277 fd);
3bd1e081
MD
278 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
279 msg.u.stream.stream_key,
f2fc6720 280 fd, fd,
3bd1e081
MD
281 msg.u.stream.state,
282 msg.u.stream.mmap_len,
283 msg.u.stream.output,
6df2e2c9
MD
284 msg.u.stream.path_name,
285 msg.u.stream.uid,
286 msg.u.stream.gid);
3bd1e081
MD
287 if (new_stream == NULL) {
288 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
289 goto end;
290 }
291 if (ctx->on_recv_stream != NULL) {
292 ret = ctx->on_recv_stream(new_stream);
293 if (ret == 0) {
294 consumer_add_stream(new_stream);
295 } else if (ret < 0) {
296 goto end;
297 }
298 } else {
299 consumer_add_stream(new_stream);
300 }
301 break;
302 }
303 case LTTNG_CONSUMER_UPDATE_STREAM:
304 {
305 if (ctx->on_update_stream != NULL) {
306 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
307 if (ret == 0) {
308 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
309 } else if (ret < 0) {
310 goto end;
311 }
312 } else {
313 consumer_change_stream_state(msg.u.stream.stream_key,
314 msg.u.stream.state);
315 }
316 break;
317 }
318 default:
319 break;
320 }
321end:
04fdd819
MD
322 /*
323 * Wake-up the other end by writing a null byte in the pipe
324 * (non-blocking). Important note: Because writing into the
325 * pipe is non-blocking (and therefore we allow dropping wakeup
326 * data, as long as there is wakeup data present in the pipe
327 * buffer to wake up the other end), the other end should
328 * perform the following sequence for waiting:
329 * 1) empty the pipe (reads).
330 * 2) perform update operation.
331 * 3) wait on the pipe (poll).
332 */
333 do {
334 ret = write(ctx->consumer_poll_pipe[1], "", 1);
335 } while (ret == -1UL && errno == EINTR);
3bd1e081
MD
336end_nosignal:
337 return 0;
338}
d41f73b7
MD
339
340/*
341 * Consume data on a file descriptor and write it on a trace file.
342 */
4078b776 343ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
344 struct lttng_consumer_local_data *ctx)
345{
346 unsigned long len;
347 int err;
4078b776 348 ssize_t ret = 0;
d41f73b7
MD
349 int infd = stream->wait_fd;
350
351 DBG("In read_subbuffer (infd : %d)", infd);
352 /* Get the next subbuffer */
353 err = kernctl_get_next_subbuf(infd);
354 if (err != 0) {
d41f73b7
MD
355 /*
356 * This is a debug message even for single-threaded consumer,
357 * because poll() have more relaxed criterions than get subbuf,
358 * so get_subbuf may fail for short race windows where poll()
359 * would issue wakeups.
360 */
361 DBG("Reserving sub buffer failed (everything is normal, "
362 "it is due to concurrency)");
363 goto end;
364 }
365
366 switch (stream->output) {
367 case LTTNG_EVENT_SPLICE:
368 /* read the whole subbuffer */
369 err = kernctl_get_padded_subbuf_size(infd, &len);
370 if (err != 0) {
87dc6a9c 371 errno = -ret;
d41f73b7
MD
372 perror("Getting sub-buffer len failed.");
373 goto end;
374 }
375
376 /* splice the subbuffer to the tracefile */
377 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
47e81c02 378 if (ret != len) {
d41f73b7
MD
379 /*
380 * display the error but continue processing to try
381 * to release the subbuffer
382 */
383 ERR("Error splicing to tracefile");
384 }
47e81c02 385
d41f73b7
MD
386 break;
387 case LTTNG_EVENT_MMAP:
388 /* read the used subbuffer size */
389 err = kernctl_get_padded_subbuf_size(infd, &len);
390 if (err != 0) {
87dc6a9c 391 errno = -ret;
d41f73b7
MD
392 perror("Getting sub-buffer len failed.");
393 goto end;
394 }
395 /* write the subbuffer to the tracefile */
396 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
47e81c02 397 if (ret != len) {
d41f73b7
MD
398 /*
399 * display the error but continue processing to try
400 * to release the subbuffer
401 */
402 ERR("Error writing to tracefile");
403 }
404 break;
405 default:
406 ERR("Unknown output method");
407 ret = -1;
408 }
409
410 err = kernctl_put_next_subbuf(infd);
411 if (err != 0) {
87dc6a9c 412 errno = -ret;
d41f73b7
MD
413 if (errno == EFAULT) {
414 perror("Error in unreserving sub buffer\n");
415 } else if (errno == EIO) {
416 /* Should never happen with newer LTTng versions */
417 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
418 }
419 goto end;
420 }
421
422end:
423 return ret;
424}
425
426int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
427{
428 int ret;
429
430 /* Opening the tracefile in write mode */
431 if (stream->path_name != NULL) {
e11d277b 432 ret = run_as_open(stream->path_name,
60b6c79c
MD
433 O_WRONLY|O_CREAT|O_TRUNC,
434 S_IRWXU|S_IRWXG|S_IRWXO,
435 stream->uid, stream->gid);
d41f73b7
MD
436 if (ret < 0) {
437 ERR("Opening %s", stream->path_name);
438 perror("open");
439 goto error;
440 }
441 stream->out_fd = ret;
442 }
443
444 if (stream->output == LTTNG_EVENT_MMAP) {
445 /* get the len of the mmap region */
446 unsigned long mmap_len;
447
448 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
449 if (ret != 0) {
87dc6a9c 450 errno = -ret;
d41f73b7
MD
451 perror("kernctl_get_mmap_len");
452 goto error_close_fd;
453 }
454 stream->mmap_len = (size_t) mmap_len;
455
456 stream->mmap_base = mmap(NULL, stream->mmap_len,
457 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
458 if (stream->mmap_base == MAP_FAILED) {
459 perror("Error mmaping");
460 ret = -1;
461 goto error_close_fd;
462 }
463 }
464
465 /* we return 0 to let the library handle the FD internally */
466 return 0;
467
468error_close_fd:
469 {
470 int err;
471
472 err = close(stream->out_fd);
473 assert(!err);
474 }
475error:
476 return ret;
477}
478
This page took 0.054415 seconds and 5 git commands to generate.