wip rotate consumer pipe
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 28 Sep 2017 20:26:42 +0000 (16:26 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Thu, 28 Sep 2017 20:26:42 +0000 (16:26 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c

index c6fce97774f4888d22738d6661832fd90c6bb4e7..b37629d4b2b8bb7d2f5caa974886556a9c9b6223 100644 (file)
@@ -1134,6 +1134,9 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 
        (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
        (*pollfd)[i + 1].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 2].fd = lttng_pipe_get_readfd(ctx->consumer_data_rotate_pipe);
+       (*pollfd)[i + 2].events = POLLIN | POLLPRI;
        return i;
 }
 
@@ -1345,6 +1348,16 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_wakeup_pipe;
        }
 
+       ctx->consumer_data_rotate_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_data_rotate_pipe) {
+               goto error_data_rotate_pipe;
+       }
+
+       ctx->consumer_metadata_rotate_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_metadata_rotate_pipe) {
+               goto error_metadata_rotate_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1371,6 +1384,10 @@ error_metadata_pipe:
 error_channel_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
+error_metadata_rotate_pipe:
+       lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+error_data_rotate_pipe:
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
 error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
@@ -1459,6 +1476,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+       lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
 
        unlink(ctx->consumer_command_sock_path);
@@ -2273,6 +2292,37 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
+static
+int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
+               struct lttng_pipe *stream_pipe)
+{
+       int ret;
+       ssize_t pipe_len;
+       struct lttng_consumer_stream *stream;
+
+       pipe_len = lttng_pipe_read(stream_pipe, &stream, sizeof(stream));
+       if (pipe_len < sizeof(stream)) {
+               if (pipe_len < 0) {
+                       PERROR("read metadata stream");
+               }
+               ERR("Failed to read stream on metadata rotate pipe");
+               ret = -1;
+               goto end;
+       }
+
+       fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
+       ret = lttng_consumer_rotate_stream(ctx, stream);
+       if (ret < 0) {
+               ERR("Failed to rotate metadata stream");
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
@@ -2301,7 +2351,7 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Thread metadata poll started");
 
        /* Size is set to 1 for the consumer_metadata pipe */
-       ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
+       ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
        if (ret < 0) {
                ERR("Poll set creation failed");
                goto end_poll;
@@ -2313,6 +2363,12 @@ void *consumer_thread_metadata_poll(void *data)
                goto end;
        }
 
+       ret = lttng_poll_add(&events,
+                       lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe), LPOLLIN);
+       if (ret < 0) {
+               goto end;
+       }
+
        /* Main loop */
        DBG("Metadata main loop started");
 
@@ -2401,6 +2457,35 @@ restart:
 
                                /* Handle other stream */
                                continue;
+                       } else if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe)) {
+                               if (revents & LPOLLIN) {
+                                       ret = handle_rotate_wakeup_pipe(ctx,
+                                                       ctx->consumer_metadata_rotate_pipe);
+                                       if (ret < 0) {
+                                               ERR("Failed to rotate metadata stream");
+                                               lttng_poll_del(&events,
+                                                               lttng_pipe_get_readfd(
+                                                                       ctx->consumer_metadata_rotate_pipe));
+                                               lttng_pipe_read_close(
+                                                               ctx->consumer_metadata_rotate_pipe);
+                                               goto end;
+                                       }
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Metadata rotate pipe hung up");
+                                       fprintf(stderr, "Metadata rotate pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_rotate_pipe);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
+                               }
+                               continue;
                        }
 
                        rcu_read_lock();
@@ -2543,17 +2628,19 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /*
-                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
-                        * wake up pipe.
+                        * Allocate for all fds and:
+                        *   +1 for the consumer_data_pipe
+                        *   +1 for wake up pipe
+                        *   +1 for consumer_data_rotate_pipe.
                         */
-                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+                       pollfd = zmalloc((consumer_data.stream_count + 3) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = zmalloc((consumer_data.stream_count + 2) *
+                       local_stream = zmalloc((consumer_data.stream_count + 3) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2654,6 +2741,17 @@ void *consumer_thread_data_poll(void *data)
                        ctx->has_wakeup = 0;
                }
 
+               /* Handle consumer_data_rotate_pipe. */
+               if (pollfd[nb_fd + 2].revents & (POLLIN | POLLPRI)) {
+                       fprintf(stderr, "data wakeup pipe\n");
+                       ret = handle_rotate_wakeup_pipe(ctx,
+                                       ctx->consumer_data_rotate_pipe);
+                       if (ret < 0) {
+                               ERR("Failed to rotate metadata stream");
+                               goto end;
+                       }
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2772,6 +2870,7 @@ end:
         * the read side.
         */
        (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
+       (void) lttng_pipe_write_close(ctx->consumer_metadata_rotate_pipe);
 
 error_testpoint:
        if (err) {
@@ -3319,6 +3418,7 @@ int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
 {
        int ret;
 
+       fprintf(stderr, "Notif send\n");
        do {
                ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
        } while (ret == -1 && errno == EINTR);
@@ -3328,6 +3428,7 @@ int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
                DBG("Sent channel rotation notification for channel key %"
                                PRIu64, key);
        }
+       fprintf(stderr, "Notif done\n");
 
        return ret;
 }
@@ -3371,6 +3472,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream,
                abort();
        }
 
+       fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
        if (--stream->chan->nr_stream_rotate_pending == 0) {
                ret = rotate_notify_sessiond(ctx, stream->chan->key);
        }
@@ -3411,6 +3513,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_cond_broadcast(&stream->metadata_rdv);
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
+       fprintf(stderr, "rotated: %d\n", stream->rotated);
        pthread_mutex_unlock(&stream->lock);
 
        rotate_ret = consumer_post_rotation(stream, ctx);
@@ -3945,7 +4048,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
 
-       DBG("Kernel consumer sample rotate position for channel %" PRIu64, key);
+       DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        rcu_read_lock();
 
@@ -3984,14 +4087,14 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
                memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
-                       ERR("Taking kernel snapshot positions");
+                       ERR("Taking snapshot positions");
                        goto end_unlock;
                }
 
                ret = lttng_consumer_get_produced_snapshot(stream,
                                &stream->rotate_position);
                if (ret < 0) {
-                       ERR("Produced kernel snapshot position");
+                       ERR("Produced snapshot position");
                        goto end_unlock;
                }
                fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
@@ -4005,6 +4108,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
                        fprintf(stderr, "Stream %lu ready to rotate to %s\n",
                                        stream->key, channel->pathname);
                }
+               fprintf(stderr, "before increasinc nr_pending: %lu\n", channel->nr_stream_rotate_pending);
                channel->nr_stream_rotate_pending++;
 
                ret = consumer_flush_buffer(stream, 1);
@@ -4243,6 +4347,7 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
        int ret;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
+       struct lttng_pipe *stream_pipe;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
 
@@ -4254,33 +4359,31 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
                ret = -1;
                goto end;
        }
+
+       if (channel->metadata_stream) {
+               fprintf(stderr, "M\n");
+               stream_pipe = ctx->consumer_metadata_rotate_pipe;
+       } else {
+               fprintf(stderr, "D\n");
+               stream_pipe = ctx->consumer_data_rotate_pipe;
+       }
+
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
                health_code_update();
 
-               /*
-                * Lock stream because we are about to change its state.
-                */
-               pthread_mutex_lock(&stream->lock);
                if (stream->rotate_ready == 0) {
-                       pthread_mutex_unlock(&stream->lock);
                        continue;
                }
-               ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (ret < 0) {
-                       pthread_mutex_unlock(&stream->lock);
-                       ERR("Stream rotation error");
-                       goto end;
-               }
-
-               pthread_mutex_unlock(&stream->lock);
-               ret = consumer_post_rotation(stream, ctx);
+               fprintf(stderr, "send stream %lu on wakeup pipe\n", stream->key);
+               ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
                if (ret < 0) {
-                       ERR("Failed after a rotation");
+                       ERR("Failed to wakeup consumer rotate pipe");
                        goto end;
                }
+               fprintf(stderr, "done sending stream %lu on wakeup pipe\n", stream->key);
        }
 
        ret = 0;
index 0ee937c7a488053e3df5e2b06571c6538857392e..8b87838e65bf0f4f88ff714185072bfdae2c1df0 100644 (file)
@@ -606,6 +606,16 @@ struct lttng_consumer_local_data {
         * its rotation (write-only).
         */
        int channel_rotate_pipe;
+       /*
+        * Pipe to wakeup the data thread if a stream needs to rotated
+        * immediately (vs waiting for more data).
+        */
+       struct lttng_pipe *consumer_data_rotate_pipe;
+       /*
+        * Pipe to wakeup the metadata thread if a stream needs to rotated
+        * immediately (vs waiting for more data).
+        */
+       struct lttng_pipe *consumer_metadata_rotate_pipe;
 };
 
 /*
index b957309258f0c38654020abd33355c0b4b3ee1c6..f33372a493092571bab86e8fbb171f130b912789 100644 (file)
@@ -1156,19 +1156,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               /*
-                * Rotate the streams that are ready right now.
-                * FIXME: this is a second consecutive iteration over the
-                * streams in a channel, there is probably a better way to
-                * handle this, but it needs to be after the
-                * consumer_send_status_msg() call.
-                */
+               /* Rotate the streams that are ready right now.  */
                ret = lttng_consumer_rotate_ready_streams(
                                msg.u.rotate_channel.key, ctx);
                if (ret < 0) {
-                       ERR("Rotate channel failed");
+                       ERR("Rotate ready streams failed");
                        ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
                }
+
                break;
        }
        case LTTNG_CONSUMER_ROTATE_RENAME:
@@ -1509,6 +1504,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
        rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
+       fprintf(stderr, "consumer read stream %lu, len %lu, ready = %d\n", stream->key,
+                       len, rotate_ready);
        if (rotate_ready < 0) {
                ERR("Failed to check if stream is ready for rotation");
                err = kernctl_put_subbuf(infd);
@@ -1666,6 +1663,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                ret = err;
                goto error;
        }
+       fprintf(stderr, "consumer read stream %lu done\n", stream->key);
 
        /* Write index if needed. */
        if (!write_index) {
This page took 0.034505 seconds and 5 git commands to generate.