post-rotation cleanup
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 26 Jul 2017 20:01:04 +0000 (16:01 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Wed, 30 Aug 2017 19:29:02 +0000 (15:29 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 90e632dc37caadab30866e4a6719a19e7198bfa4..1a8dadad8bb760fa93b4a2c2b0ab6f56e1c65be2 100644 (file)
@@ -3312,10 +3312,76 @@ error_testpoint:
        return NULL;
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       int ret;
+
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("write to the channel rotate pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+       }
+
+       return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->rotated) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->chan->lock);
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               /*
+                * Wakeup the metadata thread so it dumps the metadata cache
+                * to file again.
+                */
+               consumer_metadata_wakeup_pipe(stream->chan);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       pthread_mutex_unlock(&stream->chan->lock);
+       stream->rotated = 0;
+
+end:
+       return ret;
+}
+
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
+       int rotate_ret;
 
        pthread_mutex_lock(&stream->lock);
        if (stream->metadata_flag) {
@@ -3342,6 +3408,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
        pthread_mutex_unlock(&stream->lock);
+
+       rotate_ret = consumer_post_rotation(stream, ctx);
+       if (rotate_ret < 0) {
+               ERR("Failed after a rotation");
+               ret = -1;
+       }
+
        return ret;
 }
 
@@ -3828,7 +3901,7 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
 }
 
 static
-int flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
 {
        int ret = 0;
 
@@ -3891,6 +3964,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
                 * Lock stream because we are about to change its state.
                 */
                pthread_mutex_lock(&stream->lock);
+               memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot positions");
@@ -3918,7 +3992,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
                }
                channel->nr_stream_rotate_pending++;
 
-               ret = flush_buffer(stream, 1);
+               ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
                        ERR("Failed to flush stream");
                        goto end_unlock;
@@ -3939,25 +4013,6 @@ end:
        return ret;
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       int ret;
-
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("write to the channel rotate pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-       }
-
-       return ret;
-}
-
 /*
  * Performs the stream rotation for the rotate session feature if needed.
  * It must be called with the stream and channel locks held.
@@ -4012,9 +4067,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        }
 
        fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key,
-                       stream->chan->pathname, stream->name);
-       ret = utils_create_stream_file(stream->chan->pathname, stream->name,
-                       stream->chan->tracefile_size, stream->tracefile_count_current,
+                       stream->channel_ro_pathname, stream->name);
+       ret = utils_create_stream_file(stream->channel_ro_pathname, stream->name,
+                       stream->channel_ro_tracefile_size, stream->tracefile_count_current,
                        stream->uid, stream->gid, NULL);
        if (ret < 0) {
                goto error;
@@ -4027,9 +4082,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
 
                lttng_index_file_put(stream->index_file);
 
-               index_file = lttng_index_file_create(stream->chan->pathname,
+               index_file = lttng_index_file_create(stream->channel_ro_pathname,
                                stream->name, stream->uid, stream->gid,
-                               stream->chan->tracefile_size,
+                               stream->channel_ro_tracefile_size,
                                stream->tracefile_count_current,
                                CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
                if (!index_file) {
@@ -4046,7 +4101,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                         */
                        ret = kernctl_metadata_cache_dump(stream->wait_fd);
                        if (ret < 0) {
-                               ERR("Failed to dump the metadata cache after rotation");
+                               ERR("Failed to dump the kernel metadata cache after rotation");
                                goto error;
                        }
                        break;
@@ -4057,12 +4112,6 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                         * will write from the beginning on the next push.
                         */
                        stream->ust_metadata_pushed = 0;
-                       /*
-                        * Wakeup the metadata thread so it dumps the metadata cache
-                        * to file again.
-                        * FIXME: post-pone that after we have released the stream lock.
-                        */
-                       consumer_metadata_wakeup_pipe(stream->chan);
                        break;
                default:
                        ERR("Unknown consumer_data type");
@@ -4072,11 +4121,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
 
        stream->rotate_position = 0;
        stream->rotate_ready = 0;
-
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               rotate_notify_sessiond(ctx, stream->chan->key);
-               fprintf(stderr, "SENT %lu\n", stream->chan->key);
-       }
+       stream->rotated = 1;
 
        ret = 0;
        goto end;
@@ -4135,6 +4180,11 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
                }
 
                pthread_mutex_unlock(&stream->lock);
+               ret = consumer_post_rotation(stream, ctx);
+               if (ret < 0) {
+                       ERR("Failed after a rotation");
+                       goto end;
+               }
        }
 
        ret = 0;
index 77256c146b9178257ec1921c8f2440cdb23664b8..d8df7adb4886ed05c13fd181ffddb5d8899bc8fd 100644 (file)
@@ -446,6 +446,12 @@ struct lttng_consumer_stream {
         * use the produced/consumed positions as reference.
         */
        unsigned int rotate_ready:1;
+       /*
+        * Flag set to 1 if the stream just got rotated. This is used to
+        * perform actions on the channel after a rotation without needing
+        * to nest the channel lock inside the stream lock.
+        */
+       unsigned int rotated:1;
 
        /* Indicate if the stream still has some data to be read. */
        unsigned int has_data:1;
index 89be8de0ab0626f1c7d92b3226544937f72bea62..76c0310635a402e78dd37277126d46ca2433cbce 100644 (file)
@@ -1428,7 +1428,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
                ret = err;
-               goto end;
+               goto error;
        }
 
        /* Get the full subbuffer size including padding */
@@ -1444,10 +1444,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                        }
                        ret = err;
-                       goto end;
+                       goto error;
                }
                ret = err;
-               goto end;
+               goto error;
        }
 
        if (!stream->metadata_flag) {
@@ -1462,9 +1462,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
-                       goto end;
+                       goto error;
                }
                ret = update_stream_stats(stream);
                if (ret < 0) {
@@ -1477,9 +1477,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
-                       goto end;
+                       goto error;
                }
        } else {
                write_index = 0;
@@ -1494,9 +1494,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
-                       goto end;
+                       goto error;
                }
        }
 
@@ -1541,10 +1541,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
                        ret = err;
-                       goto end;
+                       goto error;
                }
 
                /* Make sure the tracer is not gone mad on us! */
@@ -1587,12 +1587,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
                ret = err;
-               goto end;
+               goto error;
        }
 
        /* Write index if needed. */
        if (!write_index) {
-               goto end;
+               goto rotate;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -1616,24 +1616,23 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        pthread_mutex_unlock(&stream->metadata_timer_lock);
                }
                if (err < 0) {
-                       goto end;
+                       goto error;
                }
        }
 
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
-               goto end;
+               goto error;
        }
 
-end:
-       pthread_mutex_lock(&stream->chan->lock);
+rotate:
        rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
        if (rotation_ret < 0) {
-               pthread_mutex_unlock(&stream->chan->lock);
                ERR("Stream rotation error");
-               goto end;
+               goto error;
        }
-       pthread_mutex_unlock(&stream->chan->lock);
+
+error:
        return ret;
 }
 
index 668de5775c61f22f168e28194d4f5477619f9956..db6ec8bfffe1b8900fb3320192f39a1acdda09ec 100644 (file)
@@ -2589,7 +2589,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                readlen = lttng_read(stream->wait_fd, &dummy, 1);
                if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
                        ret = readlen;
-                       goto end;
+                       goto error;
                }
        }
 
@@ -2604,7 +2604,7 @@ retry:
                if (stream->metadata_flag) {
                        ret = commit_one_metadata_packet(stream);
                        if (ret <= 0) {
-                               goto end;
+                               goto error;
                        }
                        ustctl_flush_buffer(stream->ustream, 1);
                        goto retry;
@@ -2619,7 +2619,7 @@ retry:
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency) [ret: %d]", err);
-               goto end;
+               goto error;
        }
        assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
 
@@ -2629,7 +2629,7 @@ retry:
                if (ret < 0) {
                        err = ustctl_put_subbuf(ustream);
                        assert(err == 0);
-                       goto end;
+                       goto error;
                }
 
                /* Update the stream's sequence and discarded events count. */
@@ -2638,7 +2638,7 @@ retry:
                        PERROR("kernctl_get_events_discarded");
                        err = ustctl_put_subbuf(ustream);
                        assert(err == 0);
-                       goto end;
+                       goto error;
                }
        } else {
                write_index = 0;
@@ -2687,13 +2687,13 @@ retry:
        if (!stream->metadata_flag) {
                ret = notify_if_more_data(stream, ctx);
                if (ret < 0) {
-                       goto end;
+                       goto error;
                }
        }
 
        /* Write index if needed. */
        if (!write_index) {
-               goto end;
+               goto rotate;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -2718,28 +2718,24 @@ retry:
                }
 
                if (err < 0) {
-                       goto end;
+                       goto error;
                }
        }
 
        assert(!stream->metadata_flag);
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
-               goto end;
+               goto error;
        }
 
-end:
-       /* FIXME: do we need this lock, it causes deadlocks when called
-        * at the same time with lttng_ustconsumer_rotate_channel ? */
-//     pthread_mutex_lock(&stream->chan->lock);
+rotate:
        rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
        if (rotation_ret < 0) {
-//             pthread_mutex_unlock(&stream->chan->lock);
                ret = -1;
                ERR("Stream rotation error");
-               goto end;
+               goto error;
        }
-//     pthread_mutex_unlock(&stream->chan->lock);
+error:
        return ret;
 }
 
This page took 0.038547 seconds and 5 git commands to generate.