Initial import of the new binary lttng-relayd
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
3bd1e081
MD
21#include <poll.h>
22#include <pthread.h>
23#include <stdlib.h>
24#include <string.h>
25#include <sys/mman.h>
26#include <sys/socket.h>
dbb5dfe6 27#include <sys/stat.h>
3bd1e081
MD
28#include <sys/types.h>
29#include <unistd.h>
9df8df5e 30#include <lttng/ust-ctl.h>
0857097f 31
990570ed 32#include <common/common.h>
10a8a223 33#include <common/sessiond-comm/sessiond-comm.h>
dbb5dfe6 34#include <common/compat/fcntl.h>
10a8a223
DG
35
36#include "ust-consumer.h"
3bd1e081
MD
37
38extern struct lttng_consumer_global_data consumer_data;
39extern int consumer_poll_timeout;
40extern volatile int consumer_quit;
41
42/*
43 * Mmap the ring buffer, read it and write the data to the tracefile.
44 *
4078b776 45 * Returns the number of bytes written, else negative value on error.
3bd1e081 46 */
4078b776 47ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
3bd1e081
MD
48 struct lttng_consumer_local_data *ctx,
49 struct lttng_consumer_stream *stream, unsigned long len)
50{
51 unsigned long mmap_offset;
47e81c02 52 long ret = 0, written = 0;
3bd1e081
MD
53 off_t orig_offset = stream->out_fd_offset;
54 int outfd = stream->out_fd;
55
56 /* get the offset inside the fd to mmap */
57 ret = ustctl_get_mmap_read_offset(stream->chan->handle,
58 stream->buf, &mmap_offset);
59 if (ret != 0) {
87dc6a9c 60 errno = -ret;
4c462e79 61 PERROR("ustctl_get_mmap_read_offset");
47e81c02 62 written = ret;
3bd1e081
MD
63 goto end;
64 }
65 while (len > 0) {
66 ret = write(outfd, stream->mmap_base + mmap_offset, len);
47e81c02
MD
67 if (ret < 0) {
68 if (errno == EINTR) {
69 /* restart the interrupted system call */
70 continue;
71 } else {
72 PERROR("Error in file write");
73 if (written == 0) {
74 written = ret;
75 }
76 goto end;
77 }
78 } else if (ret > len) {
4c462e79 79 PERROR("Error in file write");
47e81c02 80 written += ret;
3bd1e081 81 goto end;
47e81c02
MD
82 } else {
83 len -= ret;
84 mmap_offset += ret;
3bd1e081
MD
85 }
86 /* This won't block, but will start writeout asynchronously */
dbb5dfe6 87 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
3bd1e081
MD
88 SYNC_FILE_RANGE_WRITE);
89 stream->out_fd_offset += ret;
47e81c02 90 written += ret;
3bd1e081 91 }
3bd1e081 92 lttng_consumer_sync_trace_file(stream, orig_offset);
3bd1e081 93end:
47e81c02 94 return written;
3bd1e081
MD
95}
96
97/*
98 * Splice the data from the ring buffer to the tracefile.
99 *
100 * Returns the number of bytes spliced.
101 */
4078b776 102ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
3bd1e081
MD
103 struct lttng_consumer_local_data *ctx,
104 struct lttng_consumer_stream *stream, unsigned long len)
105{
106 return -ENOSYS;
107}
108
109/*
110 * Take a snapshot for a specific fd
111 *
112 * Returns 0 on success, < 0 on error
113 */
114int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
115 struct lttng_consumer_stream *stream)
116{
117 int ret = 0;
118
119 ret = ustctl_snapshot(stream->chan->handle, stream->buf);
120 if (ret != 0) {
87dc6a9c 121 errno = -ret;
4c462e79 122 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
123 }
124
125 return ret;
126}
127
128/*
129 * Get the produced position
130 *
131 * Returns 0 on success, < 0 on error
132 */
133int lttng_ustconsumer_get_produced_snapshot(
134 struct lttng_consumer_local_data *ctx,
135 struct lttng_consumer_stream *stream,
136 unsigned long *pos)
137{
138 int ret;
139
140 ret = ustctl_snapshot_get_produced(stream->chan->handle,
141 stream->buf, pos);
142 if (ret != 0) {
87dc6a9c 143 errno = -ret;
4c462e79 144 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
145 }
146
147 return ret;
148}
149
150int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
151 int sock, struct pollfd *consumer_sockpoll)
152{
153 ssize_t ret;
154 struct lttcomm_consumer_msg msg;
155
156 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
157 if (ret != sizeof(msg)) {
158 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
159 return ret;
160 }
161 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
162 return -ENOENT;
163 }
164
165 switch (msg.cmd_type) {
166 case LTTNG_CONSUMER_ADD_CHANNEL:
167 {
168 struct lttng_consumer_channel *new_channel;
169 int fds[1];
170 size_t nb_fd = 1;
171
172 /* block */
173 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
174 return -EINTR;
175 }
176 ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
177 if (ret != sizeof(fds)) {
178 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
179 return ret;
180 }
181
182 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
183
184 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
185 fds[0], -1,
186 msg.u.channel.mmap_len,
187 msg.u.channel.max_sb_size);
188 if (new_channel == NULL) {
189 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
190 goto end_nosignal;
191 }
192 if (ctx->on_recv_channel != NULL) {
193 ret = ctx->on_recv_channel(new_channel);
194 if (ret == 0) {
195 consumer_add_channel(new_channel);
196 } else if (ret < 0) {
197 goto end_nosignal;
198 }
199 } else {
200 consumer_add_channel(new_channel);
201 }
202 goto end_nosignal;
203 }
204 case LTTNG_CONSUMER_ADD_STREAM:
205 {
206 struct lttng_consumer_stream *new_stream;
207 int fds[2];
208 size_t nb_fd = 2;
209
210 /* block */
211 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
212 return -EINTR;
213 }
214 ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
215 if (ret != sizeof(fds)) {
216 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
217 return ret;
218 }
219
220 DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
221 fds[0], fds[1]);
d41f73b7 222 assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
7ad0a0cb 223 new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
3bd1e081
MD
224 msg.u.stream.stream_key,
225 fds[0], fds[1],
226 msg.u.stream.state,
227 msg.u.stream.mmap_len,
228 msg.u.stream.output,
6df2e2c9
MD
229 msg.u.stream.path_name,
230 msg.u.stream.uid,
231 msg.u.stream.gid);
3bd1e081
MD
232 if (new_stream == NULL) {
233 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
234 goto end;
235 }
236 if (ctx->on_recv_stream != NULL) {
237 ret = ctx->on_recv_stream(new_stream);
238 if (ret == 0) {
239 consumer_add_stream(new_stream);
240 } else if (ret < 0) {
241 goto end;
242 }
243 } else {
244 consumer_add_stream(new_stream);
245 }
246 break;
247 }
248 case LTTNG_CONSUMER_UPDATE_STREAM:
249 {
7ad0a0cb
MD
250 return -ENOSYS;
251#if 0
3bd1e081
MD
252 if (ctx->on_update_stream != NULL) {
253 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
254 if (ret == 0) {
255 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
256 } else if (ret < 0) {
257 goto end;
258 }
259 } else {
260 consumer_change_stream_state(msg.u.stream.stream_key,
261 msg.u.stream.state);
262 }
7ad0a0cb 263#endif
3bd1e081
MD
264 break;
265 }
266 default:
267 break;
268 }
269end:
04fdd819
MD
270 /*
271 * Wake-up the other end by writing a null byte in the pipe
272 * (non-blocking). Important note: Because writing into the
273 * pipe is non-blocking (and therefore we allow dropping wakeup
274 * data, as long as there is wakeup data present in the pipe
275 * buffer to wake up the other end), the other end should
276 * perform the following sequence for waiting:
277 * 1) empty the pipe (reads).
278 * 2) perform update operation.
279 * 3) wait on the pipe (poll).
280 */
281 do {
282 ret = write(ctx->consumer_poll_pipe[1], "", 1);
283 } while (ret == -1UL && errno == EINTR);
3bd1e081
MD
284end_nosignal:
285 return 0;
286}
287
288int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
289{
13161846 290 struct lttng_ust_object_data obj;
3bd1e081
MD
291
292 obj.handle = -1;
293 obj.shm_fd = chan->shm_fd;
294 obj.wait_fd = chan->wait_fd;
295 obj.memory_map_size = chan->mmap_len;
296 chan->handle = ustctl_map_channel(&obj);
297 if (!chan->handle) {
298 return -ENOMEM;
299 }
b5c5fc29 300 chan->wait_fd_is_copy = 1;
2c1dd183 301 chan->shm_fd = -1;
b5c5fc29 302
3bd1e081
MD
303 return 0;
304}
305
d056b477
MD
306void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
307{
308 ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
effcf122 309 stream->hangup_flush_done = 1;
d056b477
MD
310}
311
3bd1e081
MD
312void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
313{
314 ustctl_unmap_channel(chan->handle);
315}
316
317int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
318{
13161846 319 struct lttng_ust_object_data obj;
3bd1e081
MD
320 int ret;
321
322 obj.handle = -1;
323 obj.shm_fd = stream->shm_fd;
324 obj.wait_fd = stream->wait_fd;
325 obj.memory_map_size = stream->mmap_len;
326 ret = ustctl_add_stream(stream->chan->handle, &obj);
327 if (ret)
328 return ret;
329 stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
330 if (!stream->buf)
331 return -EBUSY;
2c1dd183
MD
332 /* ustctl_open_stream_read has closed the shm fd. */
333 stream->wait_fd_is_copy = 1;
334 stream->shm_fd = -1;
335
3bd1e081
MD
336 stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
337 if (!stream->mmap_base) {
338 return -EINVAL;
339 }
ee77a7b0 340
3bd1e081
MD
341 return 0;
342}
343
344void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
345{
346 ustctl_close_stream_read(stream->chan->handle, stream->buf);
347}
d41f73b7
MD
348
349
350int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
351 struct lttng_consumer_local_data *ctx)
352{
353 unsigned long len;
354 int err;
355 long ret = 0;
356 struct lttng_ust_shm_handle *handle;
357 struct lttng_ust_lib_ring_buffer *buf;
358 char dummy;
359 ssize_t readlen;
360
361 DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
362 stream->wait_fd, stream->key);
363
364 /* We can consume the 1 byte written into the wait_fd by UST */
effcf122
MD
365 if (!stream->hangup_flush_done) {
366 do {
367 readlen = read(stream->wait_fd, &dummy, 1);
87dc6a9c 368 } while (readlen == -1 && errno == EINTR);
effcf122
MD
369 if (readlen == -1) {
370 ret = readlen;
371 goto end;
372 }
d41f73b7
MD
373 }
374
375 buf = stream->buf;
376 handle = stream->chan->handle;
377 /* Get the next subbuffer */
378 err = ustctl_get_next_subbuf(handle, buf);
379 if (err != 0) {
effcf122 380 ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
d41f73b7
MD
381 /*
382 * This is a debug message even for single-threaded consumer,
383 * because poll() have more relaxed criterions than get subbuf,
384 * so get_subbuf may fail for short race windows where poll()
385 * would issue wakeups.
386 */
387 DBG("Reserving sub buffer failed (everything is normal, "
388 "it is due to concurrency)");
389 goto end;
390 }
391 assert(stream->output == LTTNG_EVENT_MMAP);
392 /* read the used subbuffer size */
393 err = ustctl_get_padded_subbuf_size(handle, buf, &len);
effcf122 394 assert(err == 0);
d41f73b7
MD
395 /* write the subbuffer to the tracefile */
396 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
47e81c02 397 if (ret != len) {
d41f73b7
MD
398 /*
399 * display the error but continue processing to try
400 * to release the subbuffer
401 */
402 ERR("Error writing to tracefile");
403 }
404 err = ustctl_put_next_subbuf(handle, buf);
effcf122 405 assert(err == 0);
d41f73b7
MD
406end:
407 return ret;
408}
409
410int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
411{
412 int ret;
413
414 /* Opening the tracefile in write mode */
415 if (stream->path_name != NULL) {
e11d277b 416 ret = run_as_open(stream->path_name,
60b6c79c
MD
417 O_WRONLY|O_CREAT|O_TRUNC,
418 S_IRWXU|S_IRWXG|S_IRWXO,
419 stream->uid, stream->gid);
d41f73b7
MD
420 if (ret < 0) {
421 ERR("Opening %s", stream->path_name);
4c462e79 422 PERROR("open");
d41f73b7
MD
423 goto error;
424 }
425 stream->out_fd = ret;
426 }
427
428 /* we return 0 to let the library handle the FD internally */
429 return 0;
430
431error:
432 return ret;
433}
This page took 0.051721 seconds and 5 git commands to generate.