fix sampling positions after the put_next + fix data_rotate_pipe handling
[deliverable/lttng-tools.git] / src / common / consumer / consumer.c
index b37629d4b2b8bb7d2f5caa974886556a9c9b6223..344a72eab51c6d946e3e8a3c2f7a35a63bc7a41a 100644 (file)
@@ -2292,6 +2292,77 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       int ret;
+
+       fprintf(stderr, "Notif send\n");
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("write to the channel rotate pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+       }
+       fprintf(stderr, "Notif done\n");
+
+       return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->rotated) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->chan->lock);
+       switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * The ust_metadata_pushed counter has been reset to 0, so now
+                        * we can wakeup the metadata thread so it dumps the metadata
+                        * cache to the new file.
+                        */
+                       if (stream->metadata_flag) {
+                               consumer_metadata_wakeup_pipe(stream->chan);
+                       }
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+       }
+
+       fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       pthread_mutex_unlock(&stream->chan->lock);
+       stream->rotated = 0;
+
+end:
+       return ret;
+}
+
 static
 int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_pipe *stream_pipe)
@@ -2310,12 +2381,19 @@ int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
                goto end;
        }
 
+       pthread_mutex_lock(&stream->lock);
        fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
        ret = lttng_consumer_rotate_stream(ctx, stream);
+       pthread_mutex_unlock(&stream->lock);
        if (ret < 0) {
                ERR("Failed to rotate metadata stream");
                goto end;
        }
+       ret = consumer_post_rotation(stream, ctx);
+       if (ret < 0) {
+               ERR("Failed after a rotation");
+               ret = -1;
+       }
 
        ret = 0;
 
@@ -2589,7 +2667,7 @@ void *consumer_thread_data_poll(void *data)
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
-       int nb_fd = 0;
+       int nb_fd = 0, nb_pipes_fd;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
@@ -2628,19 +2706,20 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /*
-                        * Allocate for all fds and:
+                        * Allocate for all fds + 3:
                         *   +1 for the consumer_data_pipe
                         *   +1 for wake up pipe
                         *   +1 for consumer_data_rotate_pipe.
                         */
-                       pollfd = zmalloc((consumer_data.stream_count + 3) * sizeof(struct pollfd));
+                       nb_pipes_fd = 3;
+                       pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = zmalloc((consumer_data.stream_count + 3) *
+                       local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2667,12 +2746,12 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 2);
+               DBG("polling on %d fd", nb_fd + nb_pipes_fd);
                if (testpoint(consumerd_thread_data_poll)) {
                        goto end;
                }
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 2, -1);
+               num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -3412,77 +3491,6 @@ error_testpoint:
        return NULL;
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       int ret;
-
-       fprintf(stderr, "Notif send\n");
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("write to the channel rotate pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-       }
-       fprintf(stderr, "Notif done\n");
-
-       return ret;
-}
-
-/*
- * Perform operations that need to be done after a stream has
- * rotated and released the stream lock.
- *
- * Multiple rotations cannot occur simultaneously, so we know the state of the
- * "rotated" stream flag cannot change.
- *
- * This MUST be called WITHOUT the stream lock held.
- */
-static
-int consumer_post_rotation(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
-{
-       int ret = 0;
-
-       if (!stream->rotated) {
-               goto end;
-       }
-
-       pthread_mutex_lock(&stream->chan->lock);
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               /*
-                * The ust_metadata_pushed counter has been reset to 0, so now
-                * we can wakeup the metadata thread so it dumps the metadata
-                * cache to the new file.
-                */
-               if (stream->metadata_flag) {
-                       consumer_metadata_wakeup_pipe(stream->chan);
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
-       fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               ret = rotate_notify_sessiond(ctx, stream->chan->key);
-       }
-       pthread_mutex_unlock(&stream->chan->lock);
-       stream->rotated = 0;
-
-end:
-       return ret;
-}
-
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
@@ -4133,16 +4141,10 @@ end:
 /*
  * Check if a stream is ready to be rotated after extracting it.
  *
- * When we are called between get_next_subbuf and put_next_subbuf, the len
- * parameter is the subbuf size of the current subbuffer being extracted. This
- * len is with padding, so it is normal to see that the current position is
- * farther than the expected rotate position.
- *
  * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
- * error.
+ * error. Stream lock must be held.
  */
-int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
-               unsigned long len)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
        int ret;
        unsigned long consumed_pos;
@@ -4171,18 +4173,18 @@ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
 
        ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
        if (ret < 0) {
-               ERR("Produced kernel snapshot position");
+               ERR("Consumed kernel snapshot position");
                goto end;
        }
 
        fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
        /* Rotate position not reached yet. */
-       if ((consumed_pos + len) < stream->rotate_position) {
+       if (consumed_pos < stream->rotate_position) {
                ret = 0;
                goto end;
        }
        fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
-                       consumed_pos + len, stream->rotate_position,
+                       consumed_pos, stream->rotate_position,
                        stream->key);
        ret = 1;
 
This page took 0.028066 seconds and 5 git commands to generate.