fix: compare position safely even in case of overflow
[lttng-tools.git] / src / common / consumer / consumer.c
index 1a8dadad8bb760fa93b4a2c2b0ab6f56e1c65be2..2bfcef37ae674aedec8bc2b889d074654df70aaf 100644 (file)
@@ -789,7 +789,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
-               char *path)
+               char *path, enum lttng_domain_type domain)
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
@@ -806,7 +806,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_add_stream(&relayd->control_sock, stream->name,
                                path, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+                               stream->chan->tracefile_size, stream->chan->tracefile_count,
+                               domain);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        goto end;
@@ -1133,6 +1134,9 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 
        (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
        (*pollfd)[i + 1].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 2].fd = lttng_pipe_get_readfd(ctx->consumer_data_rotate_pipe);
+       (*pollfd)[i + 2].events = POLLIN | POLLPRI;
        return i;
 }
 
@@ -1344,6 +1348,16 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_wakeup_pipe;
        }
 
+       ctx->consumer_data_rotate_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_data_rotate_pipe) {
+               goto error_data_rotate_pipe;
+       }
+
+       ctx->consumer_metadata_rotate_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_metadata_rotate_pipe) {
+               goto error_metadata_rotate_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1370,6 +1384,10 @@ error_metadata_pipe:
 error_channel_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
+error_metadata_rotate_pipe:
+       lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+error_data_rotate_pipe:
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
 error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
@@ -1458,6 +1476,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+       lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
 
        unlink(ctx->consumer_command_sock_path);
@@ -2272,6 +2292,115 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       int ret;
+
+       fprintf(stderr, "Notif send\n");
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("write to the channel rotate pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+       }
+       fprintf(stderr, "Notif done\n");
+
+       return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->rotated) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->chan->lock);
+       switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * The ust_metadata_pushed counter has been reset to 0, so now
+                        * we can wakeup the metadata thread so it dumps the metadata
+                        * cache to the new file.
+                        */
+                       if (stream->metadata_flag) {
+                               consumer_metadata_wakeup_pipe(stream->chan);
+                       }
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+       }
+
+       fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       pthread_mutex_unlock(&stream->chan->lock);
+       stream->rotated = 0;
+
+end:
+       return ret;
+}
+
+static
+int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
+               struct lttng_pipe *stream_pipe)
+{
+       int ret;
+       ssize_t pipe_len;
+       struct lttng_consumer_stream *stream;
+
+       pipe_len = lttng_pipe_read(stream_pipe, &stream, sizeof(stream));
+       if (pipe_len < sizeof(stream)) {
+               if (pipe_len < 0) {
+                       PERROR("read metadata stream");
+               }
+               ERR("Failed to read stream on metadata rotate pipe");
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->lock);
+       fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
+       ret = lttng_consumer_rotate_stream(ctx, stream);
+       pthread_mutex_unlock(&stream->lock);
+       if (ret < 0) {
+               ERR("Failed to rotate metadata stream");
+               goto end;
+       }
+       ret = consumer_post_rotation(stream, ctx);
+       if (ret < 0) {
+               ERR("Failed after a rotation");
+               ret = -1;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
@@ -2300,7 +2429,7 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Thread metadata poll started");
 
        /* Size is set to 1 for the consumer_metadata pipe */
-       ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
+       ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
        if (ret < 0) {
                ERR("Poll set creation failed");
                goto end_poll;
@@ -2312,6 +2441,12 @@ void *consumer_thread_metadata_poll(void *data)
                goto end;
        }
 
+       ret = lttng_poll_add(&events,
+                       lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe), LPOLLIN);
+       if (ret < 0) {
+               goto end;
+       }
+
        /* Main loop */
        DBG("Metadata main loop started");
 
@@ -2400,6 +2535,35 @@ restart:
 
                                /* Handle other stream */
                                continue;
+                       } else if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe)) {
+                               if (revents & LPOLLIN) {
+                                       ret = handle_rotate_wakeup_pipe(ctx,
+                                                       ctx->consumer_metadata_rotate_pipe);
+                                       if (ret < 0) {
+                                               ERR("Failed to rotate metadata stream");
+                                               lttng_poll_del(&events,
+                                                               lttng_pipe_get_readfd(
+                                                                       ctx->consumer_metadata_rotate_pipe));
+                                               lttng_pipe_read_close(
+                                                               ctx->consumer_metadata_rotate_pipe);
+                                               goto end;
+                                       }
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Metadata rotate pipe hung up");
+                                       fprintf(stderr, "Metadata rotate pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_rotate_pipe);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
+                               }
+                               continue;
                        }
 
                        rcu_read_lock();
@@ -2503,7 +2667,7 @@ void *consumer_thread_data_poll(void *data)
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
-       int nb_fd = 0;
+       int nb_fd = 0, nb_pipes_fd;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
@@ -2542,17 +2706,20 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /*
-                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
-                        * wake up pipe.
+                        * Allocate for all fds + 3:
+                        *   +1 for the consumer_data_pipe
+                        *   +1 for wake up pipe
+                        *   +1 for consumer_data_rotate_pipe.
                         */
-                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+                       nb_pipes_fd = 3;
+                       pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = zmalloc((consumer_data.stream_count + 2) *
+                       local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2579,12 +2746,12 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 2);
+               DBG("polling on %d fd", nb_fd + nb_pipes_fd);
                if (testpoint(consumerd_thread_data_poll)) {
                        goto end;
                }
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 2, -1);
+               num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2653,6 +2820,17 @@ void *consumer_thread_data_poll(void *data)
                        ctx->has_wakeup = 0;
                }
 
+               /* Handle consumer_data_rotate_pipe. */
+               if (pollfd[nb_fd + 2].revents & (POLLIN | POLLPRI)) {
+                       fprintf(stderr, "data wakeup pipe\n");
+                       ret = handle_rotate_wakeup_pipe(ctx,
+                                       ctx->consumer_data_rotate_pipe);
+                       if (ret < 0) {
+                               ERR("Failed to rotate metadata stream");
+                               goto end;
+                       }
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2771,6 +2949,7 @@ end:
         * the read side.
         */
        (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
+       (void) lttng_pipe_write_close(ctx->consumer_metadata_rotate_pipe);
 
 error_testpoint:
        if (err) {
@@ -3312,71 +3491,6 @@ error_testpoint:
        return NULL;
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       int ret;
-
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("write to the channel rotate pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-       }
-
-       return ret;
-}
-
-/*
- * Perform operations that need to be done after a stream has
- * rotated and released the stream lock.
- *
- * Multiple rotations cannot occur simultaneously, so we know the state of the
- * "rotated" stream flag cannot change.
- *
- * This MUST be called WITHOUT the stream lock held.
- */
-static
-int consumer_post_rotation(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
-{
-       int ret = 0;
-
-       if (!stream->rotated) {
-               goto end;
-       }
-
-       pthread_mutex_lock(&stream->chan->lock);
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               /*
-                * Wakeup the metadata thread so it dumps the metadata cache
-                * to file again.
-                */
-               consumer_metadata_wakeup_pipe(stream->chan);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               ret = rotate_notify_sessiond(ctx, stream->chan->key);
-       }
-       pthread_mutex_unlock(&stream->chan->lock);
-       stream->rotated = 0;
-
-end:
-       return ret;
-}
-
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
@@ -3407,6 +3521,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_cond_broadcast(&stream->metadata_rdv);
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
+       fprintf(stderr, "rotated: %d\n", stream->rotated);
        pthread_mutex_unlock(&stream->lock);
 
        rotate_ret = consumer_post_rotation(stream, ctx);
@@ -3932,7 +4047,7 @@ end:
  * Returns 0 on success, < 0 on error
  */
 int lttng_consumer_rotate_channel(uint64_t key, char *path,
-               uint64_t relayd_id, uint32_t metadata,
+               uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
@@ -3941,7 +4056,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
 
-       DBG("Kernel consumer sample rotate position for channel %" PRIu64, key);
+       DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        rcu_read_lock();
 
@@ -3952,44 +4067,56 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
                goto end;
        }
        pthread_mutex_lock(&channel->lock);
+       channel->current_chunk_id = new_chunk_id;
        snprintf(channel->pathname, PATH_MAX, "%s", path);
+       ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG,
+                       channel->uid, channel->gid);
+       if (ret < 0) {
+               ERR("Trace directory creation error");
+               ret = -1;
+               pthread_mutex_unlock(&channel->lock);
+               goto end;
+       }
+       pthread_mutex_unlock(&channel->lock);
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
+               uint64_t consumed_pos;
+
                health_code_update();
 
                /*
                 * Lock stream because we are about to change its state.
                 */
                pthread_mutex_lock(&stream->lock);
+
                memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
-                       ERR("Taking kernel snapshot positions");
+                       ERR("Taking snapshot positions");
                        goto end_unlock;
-               } else {
-                       uint64_t consumed_pos;
+               }
 
-                       ret = lttng_consumer_get_produced_snapshot(stream,
-                                       &stream->rotate_position);
-                       if (ret < 0) {
-                               ERR("Produced kernel snapshot position");
-                               goto end_unlock;
-                       }
-                       fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
-                                       stream->key, stream->rotate_position,
-                                       channel->pathname);
-                       lttng_consumer_get_consumed_snapshot(stream,
-                                       &consumed_pos);
-                       fprintf(stderr, "consumed %lu\n", consumed_pos);
-                       if (consumed_pos == stream->rotate_position) {
-                               stream->rotate_ready = 1;
-                               fprintf(stderr, "Stream %lu ready to rotate to %s\n",
-                                               stream->key, channel->pathname);
-                       }
+               ret = lttng_consumer_get_produced_snapshot(stream,
+                               &stream->rotate_position);
+               if (ret < 0) {
+                       ERR("Produced snapshot position");
+                       goto end_unlock;
+               }
+               fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
+                               stream->key, stream->rotate_position,
+                               channel->pathname);
+               lttng_consumer_get_consumed_snapshot(stream,
+                               &consumed_pos);
+               fprintf(stderr, "consumed %lu\n", consumed_pos);
+               if (consumed_pos == stream->rotate_position) {
+                       stream->rotate_ready = 1;
+                       fprintf(stderr, "Stream %lu ready to rotate to %s\n",
+                                       stream->key, channel->pathname);
                }
+               fprintf(stderr, "before increasinc nr_pending: %lu\n", channel->nr_stream_rotate_pending);
                channel->nr_stream_rotate_pending++;
 
                ret = consumer_flush_buffer(stream, 1);
@@ -4002,25 +4129,22 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
        }
 
        ret = 0;
-       goto end_unlock_channel;
+       goto end;
 
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
-       pthread_mutex_unlock(&channel->lock);
 end:
        rcu_read_unlock();
        return ret;
 }
 
 /*
- * Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream and channel locks held.
+ * Check if a stream is ready to be rotated after extracting it.
  *
- * Return 0 on success, a negative number of error.
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
  */
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
        int ret;
        unsigned long consumed_pos;
@@ -4030,35 +4154,61 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                goto end;
        }
 
+       if (stream->rotate_ready) {
+               fprintf(stderr, "Rotate position reached for stream %lu\n",
+                               stream->key);
+               ret = 1;
+               goto end;
+       }
+
        /*
         * If we don't have the rotate_ready flag, check the consumed position
         * to determine if we need to rotate.
         */
-       if (!stream->rotate_ready) {
-               ret = lttng_consumer_sample_snapshot_positions(stream);
-               if (ret < 0) {
-                       ERR("Taking kernel snapshot positions");
-                       goto error;
-               }
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Taking kernel snapshot positions");
+               goto end;
+       }
 
-               ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
-               if (ret < 0) {
-                       ERR("Produced kernel snapshot position");
-                       goto error;
-               }
+       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+       if (ret < 0) {
+               ERR("Consumed kernel snapshot position");
+               goto end;
+       }
 
-               fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
-               /* Rotate position not reached yet. */
-               if (consumed_pos < stream->rotate_position) {
-                       ret = 0;
-                       goto end;
-               }
-               fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
-                               consumed_pos, stream->rotate_position, stream->key);
-       } else {
-               fprintf(stderr, "Rotate position reached for stream %lu\n",
-                               stream->key);
+       fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
+       /* Rotate position not reached yet. */
+       if ((long) (consumed_pos - stream->rotate_position) < 0) {
+               ret = 0;
+               goto end;
        }
+       fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
+                       consumed_pos, stream->rotate_position,
+                       stream->key);
+       ret = 1;
+
+end:
+       return ret;
+}
+
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+       stream->rotate_position = 0;
+       stream->rotate_ready = 0;
+       stream->rotated = 1;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
 
        ret = close(stream->out_fd);
        if (ret < 0) {
@@ -4092,7 +4242,65 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                }
                stream->index_file = index_file;
                stream->out_fd_offset = 0;
+       }
+
+       ret = 0;
+       goto end;
+
+error:
+       ret = -1;
+end:
+       return ret;
+
+}
+
+/*
+ * Perform the rotation a stream file on the relay.
+ */
+int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       /* FIXME: chan_ro ? */
+       ret = relayd_rotate_stream(&relayd->control_sock,
+                       stream->relayd_stream_id, stream->channel_ro_pathname,
+                       stream->chan->current_chunk_id,
+                       stream->last_sequence_number);
+
+end:
+       return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ret = rotate_relay_stream(ctx, stream);
        } else {
+               ret = rotate_local_stream(ctx, stream);
+       }
+       if (ret < 0) {
+               goto error;
+       }
+
+       if (stream->metadata_flag) {
                switch (consumer_data.type) {
                case LTTNG_CONSUMER_KERNEL:
                        /*
@@ -4118,17 +4326,11 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                        abort();
                }
        }
-
-       stream->rotate_position = 0;
-       stream->rotate_ready = 0;
-       stream->rotated = 1;
+       lttng_consumer_reset_stream_rotate_state(stream);
 
        ret = 0;
-       goto end;
 
 error:
-       ret = -1;
-end:
        return ret;
 }
 
@@ -4147,6 +4349,7 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
        int ret;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
+       struct lttng_pipe *stream_pipe;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
 
@@ -4158,33 +4361,31 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
                ret = -1;
                goto end;
        }
+
+       if (channel->metadata_stream) {
+               fprintf(stderr, "M\n");
+               stream_pipe = ctx->consumer_metadata_rotate_pipe;
+       } else {
+               fprintf(stderr, "D\n");
+               stream_pipe = ctx->consumer_data_rotate_pipe;
+       }
+
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
                health_code_update();
 
-               /*
-                * Lock stream because we are about to change its state.
-                */
-               pthread_mutex_lock(&stream->lock);
                if (stream->rotate_ready == 0) {
-                       pthread_mutex_unlock(&stream->lock);
                        continue;
                }
-               ret = lttng_consumer_rotate_stream(ctx, stream);
+               fprintf(stderr, "send stream %lu on wakeup pipe\n", stream->key);
+               ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
                if (ret < 0) {
-                       pthread_mutex_unlock(&stream->lock);
-                       ERR("Stream rotation error");
-                       goto end;
-               }
-
-               pthread_mutex_unlock(&stream->lock);
-               ret = consumer_post_rotation(stream, ctx);
-               if (ret < 0) {
-                       ERR("Failed after a rotation");
+                       ERR("Failed to wakeup consumer rotate pipe");
                        goto end;
                }
+               fprintf(stderr, "done sending stream %lu on wakeup pipe\n", stream->key);
        }
 
        ret = 0;
@@ -4193,3 +4394,82 @@ end:
        rcu_read_unlock();
        return ret;
 }
+
+static
+int rotate_rename_local(char *current_path, char *new_path,
+               uid_t uid, gid_t gid)
+{
+       int ret;
+
+       ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG,
+                       uid, gid);
+       if (ret < 0) {
+               ERR("Create directory on rotate");
+               goto end;
+       }
+
+       ret = rename(current_path, new_path);
+       /*
+        * If a domain has not yet created its channel, the domain-specific
+        * folder might not exist, but this is not an error.
+        */
+       if (ret < 0 && errno != ENOENT) {
+               PERROR("Rename completed rotation chunk");
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+static
+int rotate_rename_relay(char *current_path, char *new_path, uint64_t relayd_id)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       ret = relayd_rotate_rename(&relayd->control_sock, current_path, new_path);
+
+end:
+       return ret;
+
+}
+
+int lttng_consumer_rotate_rename(char *current_path, char *new_path,
+               uid_t uid, gid_t gid, uint64_t relayd_id)
+{
+       if (relayd_id != (uint64_t) -1ULL) {
+               return rotate_rename_relay(current_path, new_path, relayd_id);
+       } else {
+               return rotate_rename_local(current_path, new_path, uid, gid);
+       }
+}
+
+int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+               uint64_t relayd_id, uint64_t chunk_id)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+
+end:
+       return ret;
+
+}
This page took 0.036748 seconds and 5 git commands to generate.