Fix: mmap write() for large subbuffers and handle EINTR (v2)
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 6 Jul 2012 14:13:05 +0000 (10:13 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 6 Jul 2012 17:00:36 +0000 (13:00 -0400)
With large subbuffer (packet) size, if write() returns before copying the
entire packet for mmap buffers, the consumerd restarts the write
infinitely, which is not good at all.

This affects both lttng-ust (in default mmap mode) and lttng-kernel (but
only for mmap buffers, which is not the default).

This issue would show up with large subbuffer size.

We need to handle this case, as well as EINTR errors (which need to restart
write).

Also fixing the return value of mmap read functions, which were returning
the amount of data written by the last invocation of write() rather than
the total number of bytes written. splice use had the same issue.

Also now consider a write() that returns more bytes than requested as an
error.

Moreover, assigning error = ret after failed splice and write was a
mistake: error is holding the actual error value. ret just holds -1.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index bbc31f8e3be14bdb3927c87a1aabb40e28e777b5..3c9687306dc2aa2e904ef35f9954bdcc37e1d436 100644 (file)
@@ -49,7 +49,7 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_stream *stream, unsigned long len)
 {
        unsigned long mmap_offset;
                struct lttng_consumer_stream *stream, unsigned long len)
 {
        unsigned long mmap_offset;
-       ssize_t ret = 0;
+       ssize_t ret = 0, written = 0;
        off_t orig_offset = stream->out_fd_offset;
        int fd = stream->wait_fd;
        int outfd = stream->out_fd;
        off_t orig_offset = stream->out_fd_offset;
        int fd = stream->wait_fd;
        int outfd = stream->out_fd;
@@ -59,30 +59,40 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
        if (ret != 0) {
                errno = -ret;
                perror("kernctl_get_mmap_read_offset");
        if (ret != 0) {
                errno = -ret;
                perror("kernctl_get_mmap_read_offset");
+               written = ret;
                goto end;
        }
 
        while (len > 0) {
                ret = write(outfd, stream->mmap_base + mmap_offset, len);
                goto end;
        }
 
        while (len > 0) {
                ret = write(outfd, stream->mmap_base + mmap_offset, len);
-               if (ret >= len) {
-                       len = 0;
-               } else if (ret < 0) {
-                       errno = -ret;
+               if (ret < 0) {
+                       if (errno == EINTR) {
+                               /* restart the interrupted system call */
+                               continue;
+                       } else {
+                               perror("Error in file write");
+                               if (written == 0) {
+                                       written = ret;
+                               }
+                               goto end;
+                       }
+               } else if (ret > len) {
                        perror("Error in file write");
                        perror("Error in file write");
+                       written += ret;
                        goto end;
                        goto end;
+               } else {
+                       len -= ret;
+                       mmap_offset += ret;
                }
                /* This won't block, but will start writeout asynchronously */
                lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += ret;
                }
                /* This won't block, but will start writeout asynchronously */
                lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += ret;
+               written += ret;
        }
        }
-
        lttng_consumer_sync_trace_file(stream, orig_offset);
        lttng_consumer_sync_trace_file(stream, orig_offset);
-
-       goto end;
-
 end:
 end:
-       return ret;
+       return written;
 }
 
 /*
 }
 
 /*
@@ -94,7 +104,7 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
-       ssize_t ret = 0;
+       ssize_t ret = 0, written = 0;
        loff_t offset = 0;
        off_t orig_offset = stream->out_fd_offset;
        int fd = stream->wait_fd;
        loff_t offset = 0;
        off_t orig_offset = stream->out_fd_offset;
        int fd = stream->wait_fd;
@@ -107,8 +117,11 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe ret %zd", ret);
                if (ret < 0) {
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe ret %zd", ret);
                if (ret < 0) {
-                       errno = -ret;
                        perror("Error in relay splice");
                        perror("Error in relay splice");
+                       if (written == 0) {
+                               written = ret;
+                       }
+                       ret = errno;
                        goto splice_error;
                }
 
                        goto splice_error;
                }
 
@@ -116,8 +129,18 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice pipe to file %zd", ret);
                if (ret < 0) {
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice pipe to file %zd", ret);
                if (ret < 0) {
-                       errno = -ret;
                        perror("Error in file splice");
                        perror("Error in file splice");
+                       if (written == 0) {
+                               written = ret;
+                       }
+                       ret = errno;
+                       goto splice_error;
+               }
+               if (ret > len) {
+                       errno = EINVAL;
+                       perror("Wrote more data than requested");
+                       written += ret;
+                       ret = errno;
                        goto splice_error;
                }
                len -= ret;
                        goto splice_error;
                }
                len -= ret;
@@ -125,6 +148,7 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += ret;
                lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += ret;
+               written += ret;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
@@ -132,7 +156,7 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
 
 splice_error:
        /* send the appropriate error description to sessiond */
 
 splice_error:
        /* send the appropriate error description to sessiond */
-       switch(ret) {
+       switch (ret) {
        case EBADF:
                lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
                break;
        case EBADF:
                lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
                break;
@@ -148,7 +172,7 @@ splice_error:
        }
 
 end:
        }
 
 end:
-       return ret;
+       return written;
 }
 
 /*
 }
 
 /*
@@ -351,13 +375,14 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                        /* splice the subbuffer to the tracefile */
                        ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
 
                        /* splice the subbuffer to the tracefile */
                        ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
-                       if (ret < 0) {
+                       if (ret != len) {
                                /*
                                 * display the error but continue processing to try
                                 * to release the subbuffer
                                 */
                                ERR("Error splicing to tracefile");
                        }
                                /*
                                 * display the error but continue processing to try
                                 * to release the subbuffer
                                 */
                                ERR("Error splicing to tracefile");
                        }
+
                        break;
                case LTTNG_EVENT_MMAP:
                        /* read the used subbuffer size */
                        break;
                case LTTNG_EVENT_MMAP:
                        /* read the used subbuffer size */
@@ -369,7 +394,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        }
                        /* write the subbuffer to the tracefile */
                        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
                        }
                        /* write the subbuffer to the tracefile */
                        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
-                       if (ret < 0) {
+                       if (ret != len) {
                                /*
                                 * display the error but continue processing to try
                                 * to release the subbuffer
                                /*
                                 * display the error but continue processing to try
                                 * to release the subbuffer
index 2b55fd4637ead2b056fcafecc47ef5504d7c5df5..47c0d460ffb60cc612d9806325602c6eac459023 100644 (file)
@@ -49,7 +49,7 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_stream *stream, unsigned long len)
 {
        unsigned long mmap_offset;
                struct lttng_consumer_stream *stream, unsigned long len)
 {
        unsigned long mmap_offset;
-       long ret = 0;
+       long ret = 0, written = 0;
        off_t orig_offset = stream->out_fd_offset;
        int outfd = stream->out_fd;
 
        off_t orig_offset = stream->out_fd_offset;
        int outfd = stream->out_fd;
 
@@ -59,29 +59,39 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
        if (ret != 0) {
                errno = -ret;
                PERROR("ustctl_get_mmap_read_offset");
        if (ret != 0) {
                errno = -ret;
                PERROR("ustctl_get_mmap_read_offset");
+               written = ret;
                goto end;
        }
        while (len > 0) {
                ret = write(outfd, stream->mmap_base + mmap_offset, len);
                goto end;
        }
        while (len > 0) {
                ret = write(outfd, stream->mmap_base + mmap_offset, len);
-               if (ret >= len) {
-                       len = 0;
-               } else if (ret < 0) {
-                       errno = -ret;
+               if (ret < 0) {
+                       if (errno == EINTR) {
+                               /* restart the interrupted system call */
+                               continue;
+                       } else {
+                               PERROR("Error in file write");
+                               if (written == 0) {
+                                       written = ret;
+                               }
+                               goto end;
+                       }
+               } else if (ret > len) {
                        PERROR("Error in file write");
                        PERROR("Error in file write");
+                       written += ret;
                        goto end;
                        goto end;
+               } else {
+                       len -= ret;
+                       mmap_offset += ret;
                }
                /* This won't block, but will start writeout asynchronously */
                lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += ret;
                }
                /* This won't block, but will start writeout asynchronously */
                lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += ret;
+               written += ret;
        }
        }
-
        lttng_consumer_sync_trace_file(stream, orig_offset);
        lttng_consumer_sync_trace_file(stream, orig_offset);
-
-       goto end;
-
 end:
 end:
-       return ret;
+       return written;
 }
 
 /*
 }
 
 /*
@@ -384,7 +394,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        assert(err == 0);
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
        assert(err == 0);
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
-       if (ret < 0) {
+       if (ret != len) {
                /*
                 * display the error but continue processing to try
                 * to release the subbuffer
                /*
                 * display the error but continue processing to try
                 * to release the subbuffer
This page took 0.033919 seconds and 5 git commands to generate.