fix sampling positions after the put_next + fix data_rotate_pipe handling
authorJulien Desfossez <jdesfossez@efficios.com>
Fri, 29 Sep 2017 16:25:17 +0000 (12:25 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Fri, 29 Sep 2017 16:25:17 +0000 (12:25 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index b37629d4b2b8bb7d2f5caa974886556a9c9b6223..344a72eab51c6d946e3e8a3c2f7a35a63bc7a41a 100644 (file)
@@ -2292,6 +2292,77 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       int ret;
+
+       fprintf(stderr, "Notif send\n");
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("write to the channel rotate pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+       }
+       fprintf(stderr, "Notif done\n");
+
+       return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->rotated) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->chan->lock);
+       switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * The ust_metadata_pushed counter has been reset to 0, so now
+                        * we can wakeup the metadata thread so it dumps the metadata
+                        * cache to the new file.
+                        */
+                       if (stream->metadata_flag) {
+                               consumer_metadata_wakeup_pipe(stream->chan);
+                       }
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+       }
+
+       fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       pthread_mutex_unlock(&stream->chan->lock);
+       stream->rotated = 0;
+
+end:
+       return ret;
+}
+
 static
 int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_pipe *stream_pipe)
@@ -2310,12 +2381,19 @@ int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
                goto end;
        }
 
+       pthread_mutex_lock(&stream->lock);
        fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
        ret = lttng_consumer_rotate_stream(ctx, stream);
+       pthread_mutex_unlock(&stream->lock);
        if (ret < 0) {
                ERR("Failed to rotate metadata stream");
                goto end;
        }
+       ret = consumer_post_rotation(stream, ctx);
+       if (ret < 0) {
+               ERR("Failed after a rotation");
+               ret = -1;
+       }
 
        ret = 0;
 
@@ -2589,7 +2667,7 @@ void *consumer_thread_data_poll(void *data)
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
-       int nb_fd = 0;
+       int nb_fd = 0, nb_pipes_fd;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
@@ -2628,19 +2706,20 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /*
-                        * Allocate for all fds and:
+                        * Allocate for all fds + 3:
                         *   +1 for the consumer_data_pipe
                         *   +1 for wake up pipe
                         *   +1 for consumer_data_rotate_pipe.
                         */
-                       pollfd = zmalloc((consumer_data.stream_count + 3) * sizeof(struct pollfd));
+                       nb_pipes_fd = 3;
+                       pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = zmalloc((consumer_data.stream_count + 3) *
+                       local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2667,12 +2746,12 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 2);
+               DBG("polling on %d fd", nb_fd + nb_pipes_fd);
                if (testpoint(consumerd_thread_data_poll)) {
                        goto end;
                }
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 2, -1);
+               num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -3412,77 +3491,6 @@ error_testpoint:
        return NULL;
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       int ret;
-
-       fprintf(stderr, "Notif send\n");
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("write to the channel rotate pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-       }
-       fprintf(stderr, "Notif done\n");
-
-       return ret;
-}
-
-/*
- * Perform operations that need to be done after a stream has
- * rotated and released the stream lock.
- *
- * Multiple rotations cannot occur simultaneously, so we know the state of the
- * "rotated" stream flag cannot change.
- *
- * This MUST be called WITHOUT the stream lock held.
- */
-static
-int consumer_post_rotation(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
-{
-       int ret = 0;
-
-       if (!stream->rotated) {
-               goto end;
-       }
-
-       pthread_mutex_lock(&stream->chan->lock);
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               /*
-                * The ust_metadata_pushed counter has been reset to 0, so now
-                * we can wakeup the metadata thread so it dumps the metadata
-                * cache to the new file.
-                */
-               if (stream->metadata_flag) {
-                       consumer_metadata_wakeup_pipe(stream->chan);
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
-       fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               ret = rotate_notify_sessiond(ctx, stream->chan->key);
-       }
-       pthread_mutex_unlock(&stream->chan->lock);
-       stream->rotated = 0;
-
-end:
-       return ret;
-}
-
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
@@ -4133,16 +4141,10 @@ end:
 /*
  * Check if a stream is ready to be rotated after extracting it.
  *
- * When we are called between get_next_subbuf and put_next_subbuf, the len
- * parameter is the subbuf size of the current subbuffer being extracted. This
- * len is with padding, so it is normal to see that the current position is
- * farther than the expected rotate position.
- *
  * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
- * error.
+ * error. Stream lock must be held.
  */
-int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
-               unsigned long len)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
        int ret;
        unsigned long consumed_pos;
@@ -4171,18 +4173,18 @@ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
 
        ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
        if (ret < 0) {
-               ERR("Produced kernel snapshot position");
+               ERR("Consumed kernel snapshot position");
                goto end;
        }
 
        fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
        /* Rotate position not reached yet. */
-       if ((consumed_pos + len) < stream->rotate_position) {
+       if (consumed_pos < stream->rotate_position) {
                ret = 0;
                goto end;
        }
        fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
-                       consumed_pos + len, stream->rotate_position,
+                       consumed_pos, stream->rotate_position,
                        stream->key);
        ret = 1;
 
index 8b87838e65bf0f4f88ff714185072bfdae2c1df0..890e97c4d93ca2b21b86660df347e231455bdb34 100644 (file)
@@ -448,11 +448,8 @@ struct lttng_consumer_stream {
        uint64_t channel_ro_tracefile_size;
 
        /*
-        * If rotate_ready is set to 1, rotate the stream the next time data
-        * need to be extracted, regardless of the rotate_position. This is
-        * used if all the metadata has been consumed when we rotate. In this
-        * case, the snapshot of the positions returns -EAGAIN and we cannot
-        * use the produced/consumed positions as reference.
+        * Flag to inform the data or metadata thread that a stream is
+        * ready to be rotated.
         */
        unsigned int rotate_ready:1;
        /*
@@ -837,8 +834,7 @@ int consumer_create_index_file(struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_channel(uint64_t key, char *path,
                uint64_t relayd_id, uint32_t metadata,
                uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx);
-int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
-               unsigned long len);
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_ready_streams(uint64_t key,
index f33372a493092571bab86e8fbb171f130b912789..f3d074c7c606748f133719a5163d341ebcc04a30 100644 (file)
@@ -1462,7 +1462,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret, rotate_ready;
+       int err, write_index = 1, rotation_ret;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct ctf_packet_index index;
@@ -1503,27 +1503,6 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto error;
        }
 
-       rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
-       fprintf(stderr, "consumer read stream %lu, len %lu, ready = %d\n", stream->key,
-                       len, rotate_ready);
-       if (rotate_ready < 0) {
-               ERR("Failed to check if stream is ready for rotation");
-               err = kernctl_put_subbuf(infd);
-               if (err != 0) {
-                       if (err == -EFAULT) {
-                               PERROR("Error in unreserving sub buffer\n");
-                       } else if (err == -EIO) {
-                               /* Should never happen with newer LTTng versions */
-                               PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-                       }
-                       ret = err;
-                       goto error;
-               }
-               ret = -1;
-               goto error;
-       }
-       stream->rotate_ready = rotate_ready;
-
        if (!stream->metadata_flag) {
                ret = get_index_values(&index, infd);
                if (ret < 0) {
@@ -1701,13 +1680,18 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
 rotate:
-       if (stream->rotate_ready) {
+       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+       if (rotation_ret == 1) {
                rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
                if (rotation_ret < 0) {
                        ERR("Stream rotation error");
                        ret = -1;
                        goto error;
                }
+       } else if (rotation_ret < 0) {
+               ERR("Checking if stream is ready to rotate");
+               ret = -1;
+               goto error;
        }
 
 error:
index d2552cb63f451a8d7416ca4d0a696e0bc4904244..dcce32189d86d9411757ea8ef37df8e4e3961af2 100644 (file)
@@ -2607,7 +2607,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret, rotate_ready;
+       int err, write_index = 1, rotation_ret;
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
@@ -2707,16 +2707,6 @@ retry:
 
        padding = len - subbuf_size;
 
-       rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
-       if (rotate_ready < 0) {
-               ERR("Failed to check if stream is ready for rotation");
-               ret = -1;
-               err = ustctl_put_subbuf(ustream);
-               assert(err == 0);
-               goto error;
-       }
-       stream->rotate_ready = rotate_ready;
-
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
        /*
@@ -2790,13 +2780,18 @@ retry:
        }
 
 rotate:
-       if (stream->rotate_ready) {
+       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+       if (rotation_ret == 1) {
                rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
                if (rotation_ret < 0) {
-                       ret = -1;
                        ERR("Stream rotation error");
+                       ret = -1;
                        goto error;
                }
+       } else if (rotation_ret < 0) {
+               ERR("Checking if stream is ready to rotate");
+               ret = -1;
+               goto error;
        }
 error:
        return ret;
This page took 0.032767 seconds and 5 git commands to generate.