summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
6e74235)
With large subbuffer (packet) size, if write() returns before copying the
entire packet for mmap buffers, the consumerd restarts the write
infinitely, which is not good at all.
This affects both lttng-ust (in default mmap mode) and lttng-kernel (but
only for mmap buffers, which is not the default).
This issue would show up with large subbuffer size.
We need to handle this case, as well as EINTR errors (which need to restart
write).
Also fixing the return value of mmap read functions, which were returning
the amount of data written by the last invocation of write() rather than
the total number of bytes written. splice use had the same issue.
Also now consider a write() that returns more bytes than requested as an
error.
Moreover, assigning error = ret after failed splice and write was a
mistake: error is holding the actual error value. ret just holds -1.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
struct lttng_consumer_stream *stream, unsigned long len)
{
unsigned long mmap_offset;
struct lttng_consumer_stream *stream, unsigned long len)
{
unsigned long mmap_offset;
+ ssize_t ret = 0, written = 0;
off_t orig_offset = stream->out_fd_offset;
int fd = stream->wait_fd;
int outfd = stream->out_fd;
off_t orig_offset = stream->out_fd_offset;
int fd = stream->wait_fd;
int outfd = stream->out_fd;
if (ret != 0) {
errno = -ret;
perror("kernctl_get_mmap_read_offset");
if (ret != 0) {
errno = -ret;
perror("kernctl_get_mmap_read_offset");
goto end;
}
while (len > 0) {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
goto end;
}
while (len > 0) {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
- if (ret >= len) {
- len = 0;
- } else if (ret < 0) {
- errno = -ret;
+ if (ret < 0) {
+ if (errno == EINTR) {
+ /* restart the interrupted system call */
+ continue;
+ } else {
+ perror("Error in file write");
+ if (written == 0) {
+ written = ret;
+ }
+ goto end;
+ }
+ } else if (ret > len) {
perror("Error in file write");
perror("Error in file write");
+ } else {
+ len -= ret;
+ mmap_offset += ret;
}
/* This won't block, but will start writeout asynchronously */
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
}
/* This won't block, but will start writeout asynchronously */
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
lttng_consumer_sync_trace_file(stream, orig_offset);
lttng_consumer_sync_trace_file(stream, orig_offset);
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
+ ssize_t ret = 0, written = 0;
loff_t offset = 0;
off_t orig_offset = stream->out_fd_offset;
int fd = stream->wait_fd;
loff_t offset = 0;
off_t orig_offset = stream->out_fd_offset;
int fd = stream->wait_fd;
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe ret %zd", ret);
if (ret < 0) {
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe ret %zd", ret);
if (ret < 0) {
perror("Error in relay splice");
perror("Error in relay splice");
+ if (written == 0) {
+ written = ret;
+ }
+ ret = errno;
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice pipe to file %zd", ret);
if (ret < 0) {
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice pipe to file %zd", ret);
if (ret < 0) {
perror("Error in file splice");
perror("Error in file splice");
+ if (written == 0) {
+ written = ret;
+ }
+ ret = errno;
+ goto splice_error;
+ }
+ if (ret > len) {
+ errno = EINVAL;
+ perror("Wrote more data than requested");
+ written += ret;
+ ret = errno;
goto splice_error;
}
len -= ret;
goto splice_error;
}
len -= ret;
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
}
lttng_consumer_sync_trace_file(stream, orig_offset);
splice_error:
/* send the appropriate error description to sessiond */
splice_error:
/* send the appropriate error description to sessiond */
case EBADF:
lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
break;
case EBADF:
lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
break;
/* splice the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
/* splice the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
/*
* display the error but continue processing to try
* to release the subbuffer
*/
ERR("Error splicing to tracefile");
}
/*
* display the error but continue processing to try
* to release the subbuffer
*/
ERR("Error splicing to tracefile");
}
break;
case LTTNG_EVENT_MMAP:
/* read the used subbuffer size */
break;
case LTTNG_EVENT_MMAP:
/* read the used subbuffer size */
}
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
}
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
/*
* display the error but continue processing to try
* to release the subbuffer
/*
* display the error but continue processing to try
* to release the subbuffer
struct lttng_consumer_stream *stream, unsigned long len)
{
unsigned long mmap_offset;
struct lttng_consumer_stream *stream, unsigned long len)
{
unsigned long mmap_offset;
+ long ret = 0, written = 0;
off_t orig_offset = stream->out_fd_offset;
int outfd = stream->out_fd;
off_t orig_offset = stream->out_fd_offset;
int outfd = stream->out_fd;
if (ret != 0) {
errno = -ret;
PERROR("ustctl_get_mmap_read_offset");
if (ret != 0) {
errno = -ret;
PERROR("ustctl_get_mmap_read_offset");
goto end;
}
while (len > 0) {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
goto end;
}
while (len > 0) {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
- if (ret >= len) {
- len = 0;
- } else if (ret < 0) {
- errno = -ret;
+ if (ret < 0) {
+ if (errno == EINTR) {
+ /* restart the interrupted system call */
+ continue;
+ } else {
+ PERROR("Error in file write");
+ if (written == 0) {
+ written = ret;
+ }
+ goto end;
+ }
+ } else if (ret > len) {
PERROR("Error in file write");
PERROR("Error in file write");
+ } else {
+ len -= ret;
+ mmap_offset += ret;
}
/* This won't block, but will start writeout asynchronously */
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
}
/* This won't block, but will start writeout asynchronously */
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
lttng_consumer_sync_trace_file(stream, orig_offset);
lttng_consumer_sync_trace_file(stream, orig_offset);
assert(err == 0);
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
assert(err == 0);
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
/*
* display the error but continue processing to try
* to release the subbuffer
/*
* display the error but continue processing to try
* to release the subbuffer