* Returns 0 on success, < 0 on error
*/
int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
- char *path)
+ char *path, enum lttng_domain_type domain)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
- stream->chan->tracefile_size, stream->chan->tracefile_count);
+ stream->chan->tracefile_size, stream->chan->tracefile_count,
+ domain);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
goto end;
(*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
(*pollfd)[i + 1].events = POLLIN | POLLPRI;
+
+ (*pollfd)[i + 2].fd = lttng_pipe_get_readfd(ctx->consumer_data_rotate_pipe);
+ (*pollfd)[i + 2].events = POLLIN | POLLPRI;
return i;
}
goto error_wakeup_pipe;
}
+ ctx->consumer_data_rotate_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_data_rotate_pipe) {
+ goto error_data_rotate_pipe;
+ }
+
+ ctx->consumer_metadata_rotate_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_metadata_rotate_pipe) {
+ goto error_metadata_rotate_pipe;
+ }
+
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
error_channel_pipe:
utils_close_pipe(ctx->consumer_should_quit);
error_quit_pipe:
+ lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
+error_metadata_rotate_pipe:
+ lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+error_data_rotate_pipe:
lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
error_wakeup_pipe:
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_metadata_pipe);
lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+ lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
utils_close_pipe(ctx->consumer_should_quit);
unlink(ctx->consumer_command_sock_path);
rcu_read_unlock();
}
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+ uint64_t key)
+{
+ int ret;
+
+ fprintf(stderr, "Notif send\n");
+ do {
+ ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ PERROR("write to the channel rotate pipe");
+ } else {
+ DBG("Sent channel rotation notification for channel key %"
+ PRIu64, key);
+ }
+ fprintf(stderr, "Notif done\n");
+
+ return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ if (!stream->rotated) {
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->chan->lock);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * The ust_metadata_pushed counter has been reset to 0, so now
+ * we can wakeup the metadata thread so it dumps the metadata
+ * cache to the new file.
+ */
+ if (stream->metadata_flag) {
+ consumer_metadata_wakeup_pipe(stream->chan);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+ fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
+ if (--stream->chan->nr_stream_rotate_pending == 0) {
+ ret = rotate_notify_sessiond(ctx, stream->chan->key);
+ }
+ pthread_mutex_unlock(&stream->chan->lock);
+ stream->rotated = 0;
+
+end:
+ return ret;
+}
+
+static
+int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
+ struct lttng_pipe *stream_pipe)
+{
+ int ret;
+ ssize_t pipe_len;
+ struct lttng_consumer_stream *stream;
+
+ pipe_len = lttng_pipe_read(stream_pipe, &stream, sizeof(stream));
+ if (pipe_len < sizeof(stream)) {
+ if (pipe_len < 0) {
+ PERROR("read metadata stream");
+ }
+ ERR("Failed to read stream on metadata rotate pipe");
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+ fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata stream");
+ goto end;
+ }
+ ret = consumer_post_rotation(stream, ctx);
+ if (ret < 0) {
+ ERR("Failed after a rotation");
+ ret = -1;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
DBG("Thread metadata poll started");
/* Size is set to 1 for the consumer_metadata pipe */
- ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
+ ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
if (ret < 0) {
ERR("Poll set creation failed");
goto end_poll;
goto end;
}
+ ret = lttng_poll_add(&events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe), LPOLLIN);
+ if (ret < 0) {
+ goto end;
+ }
+
/* Main loop */
DBG("Metadata main loop started");
/* Handle other stream */
continue;
+ } else if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe)) {
+ if (revents & LPOLLIN) {
+ ret = handle_rotate_wakeup_pipe(ctx,
+ ctx->consumer_metadata_rotate_pipe);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata stream");
+ lttng_poll_del(&events,
+ lttng_pipe_get_readfd(
+ ctx->consumer_metadata_rotate_pipe));
+ lttng_pipe_read_close(
+ ctx->consumer_metadata_rotate_pipe);
+ goto end;
+ }
+ } else if (revents & (LPOLLERR | LPOLLHUP)) {
+ DBG("Metadata rotate pipe hung up");
+ fprintf(stderr, "Metadata rotate pipe hung up");
+ /*
+ * Remove the pipe from the poll set and continue the loop
+ * since their might be data to consume.
+ */
+ lttng_poll_del(&events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe));
+ lttng_pipe_read_close(ctx->consumer_metadata_rotate_pipe);
+ continue;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto end;
+ }
+ continue;
}
rcu_read_lock();
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
- int nb_fd = 0;
+ int nb_fd = 0, nb_pipes_fd;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
local_stream = NULL;
/*
- * Allocate for all fds +1 for the consumer_data_pipe and +1 for
- * wake up pipe.
+ * Allocate for all fds + 3:
+ * +1 for the consumer_data_pipe
+ * +1 for wake up pipe
+ * +1 for consumer_data_rotate_pipe.
*/
- pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+ nb_pipes_fd = 3;
+ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 2) *
+ local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 2);
+ DBG("polling on %d fd", nb_fd + nb_pipes_fd);
if (testpoint(consumerd_thread_data_poll)) {
goto end;
}
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 2, -1);
+ num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
ctx->has_wakeup = 0;
}
+ /* Handle consumer_data_rotate_pipe. */
+ if (pollfd[nb_fd + 2].revents & (POLLIN | POLLPRI)) {
+ fprintf(stderr, "data wakeup pipe\n");
+ ret = handle_rotate_wakeup_pipe(ctx,
+ ctx->consumer_data_rotate_pipe);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata stream");
+ goto end;
+ }
+ }
+
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
health_code_update();
* the read side.
*/
(void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
+ (void) lttng_pipe_write_close(ctx->consumer_metadata_rotate_pipe);
error_testpoint:
if (err) {
return NULL;
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- int ret;
-
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("write to the channel rotate pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- }
-
- return ret;
-}
-
-/*
- * Perform operations that need to be done after a stream has
- * rotated and released the stream lock.
- *
- * Multiple rotations cannot occur simultaneously, so we know the state of the
- * "rotated" stream flag cannot change.
- *
- * This MUST be called WITHOUT the stream lock held.
- */
-static
-int consumer_post_rotation(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
-{
- int ret = 0;
-
- if (!stream->rotated) {
- goto end;
- }
-
- pthread_mutex_lock(&stream->chan->lock);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- /*
- * Wakeup the metadata thread so it dumps the metadata cache
- * to file again.
- */
- consumer_metadata_wakeup_pipe(stream->chan);
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- ret = rotate_notify_sessiond(ctx, stream->chan->key);
- }
- pthread_mutex_unlock(&stream->chan->lock);
- stream->rotated = 0;
-
-end:
- return ret;
-}
-
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
pthread_cond_broadcast(&stream->metadata_rdv);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
+ fprintf(stderr, "rotated: %d\n", stream->rotated);
pthread_mutex_unlock(&stream->lock);
rotate_ret = consumer_post_rotation(stream, ctx);
* Returns 0 on success, < 0 on error
*/
int lttng_consumer_rotate_channel(uint64_t key, char *path,
- uint64_t relayd_id, uint32_t metadata,
+ uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
struct lttng_consumer_local_data *ctx)
{
int ret;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
- DBG("Kernel consumer sample rotate position for channel %" PRIu64, key);
+ DBG("Consumer sample rotate position for channel %" PRIu64, key);
rcu_read_lock();
goto end;
}
pthread_mutex_lock(&channel->lock);
+ channel->current_chunk_id = new_chunk_id;
snprintf(channel->pathname, PATH_MAX, "%s", path);
+ ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG,
+ channel->uid, channel->gid);
+ if (ret < 0) {
+ ERR("Trace directory creation error");
+ ret = -1;
+ pthread_mutex_unlock(&channel->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(&channel->lock);
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
+ uint64_t consumed_pos;
+
health_code_update();
/*
* Lock stream because we are about to change its state.
*/
pthread_mutex_lock(&stream->lock);
+
memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
- ERR("Taking kernel snapshot positions");
+ ERR("Taking snapshot positions");
goto end_unlock;
- } else {
- uint64_t consumed_pos;
+ }
- ret = lttng_consumer_get_produced_snapshot(stream,
- &stream->rotate_position);
- if (ret < 0) {
- ERR("Produced kernel snapshot position");
- goto end_unlock;
- }
- fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
- stream->key, stream->rotate_position,
- channel->pathname);
- lttng_consumer_get_consumed_snapshot(stream,
- &consumed_pos);
- fprintf(stderr, "consumed %lu\n", consumed_pos);
- if (consumed_pos == stream->rotate_position) {
- stream->rotate_ready = 1;
- fprintf(stderr, "Stream %lu ready to rotate to %s\n",
- stream->key, channel->pathname);
- }
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &stream->rotate_position);
+ if (ret < 0) {
+ ERR("Produced snapshot position");
+ goto end_unlock;
+ }
+ fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
+ stream->key, stream->rotate_position,
+ channel->pathname);
+ lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ fprintf(stderr, "consumed %lu\n", consumed_pos);
+ if (consumed_pos == stream->rotate_position) {
+ stream->rotate_ready = 1;
+ fprintf(stderr, "Stream %lu ready to rotate to %s\n",
+ stream->key, channel->pathname);
}
+ fprintf(stderr, "before increasinc nr_pending: %lu\n", channel->nr_stream_rotate_pending);
channel->nr_stream_rotate_pending++;
ret = consumer_flush_buffer(stream, 1);
}
ret = 0;
- goto end_unlock_channel;
+ goto end;
end_unlock:
pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
- pthread_mutex_unlock(&channel->lock);
end:
rcu_read_unlock();
return ret;
}
/*
- * Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream and channel locks held.
+ * Check if a stream is ready to be rotated after extracting it.
*
- * Return 0 on success, a negative number of error.
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
*/
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
{
int ret;
unsigned long consumed_pos;
goto end;
}
+ if (stream->rotate_ready) {
+ fprintf(stderr, "Rotate position reached for stream %lu\n",
+ stream->key);
+ ret = 1;
+ goto end;
+ }
+
/*
* If we don't have the rotate_ready flag, check the consumed position
* to determine if we need to rotate.
*/
- if (!stream->rotate_ready) {
- ret = lttng_consumer_sample_snapshot_positions(stream);
- if (ret < 0) {
- ERR("Taking kernel snapshot positions");
- goto error;
- }
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking kernel snapshot positions");
+ goto end;
+ }
- ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
- if (ret < 0) {
- ERR("Produced kernel snapshot position");
- goto error;
- }
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumed kernel snapshot position");
+ goto end;
+ }
- fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
- /* Rotate position not reached yet. */
- if (consumed_pos < stream->rotate_position) {
- ret = 0;
- goto end;
- }
- fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
- consumed_pos, stream->rotate_position, stream->key);
- } else {
- fprintf(stderr, "Rotate position reached for stream %lu\n",
- stream->key);
+ fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
+ /* Rotate position not reached yet. */
+ if ((long) (consumed_pos - stream->rotate_position) < 0) {
+ ret = 0;
+ goto end;
}
+ fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
+ consumed_pos, stream->rotate_position,
+ stream->key);
+ ret = 1;
+
+end:
+ return ret;
+}
+
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+ stream->rotate_position = 0;
+ stream->rotate_ready = 0;
+ stream->rotated = 1;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
ret = close(stream->out_fd);
if (ret < 0) {
}
stream->index_file = index_file;
stream->out_fd_offset = 0;
+ }
+
+ ret = 0;
+ goto end;
+
+error:
+ ret = -1;
+end:
+ return ret;
+
+}
+
+/*
+ * Perform the rotation a stream file on the relay.
+ */
+int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ /* FIXME: chan_ro ? */
+ ret = relayd_rotate_stream(&relayd->control_sock,
+ stream->relayd_stream_id, stream->channel_ro_pathname,
+ stream->chan->current_chunk_id,
+ stream->last_sequence_number);
+
+end:
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = rotate_relay_stream(ctx, stream);
} else {
+ ret = rotate_local_stream(ctx, stream);
+ }
+ if (ret < 0) {
+ goto error;
+ }
+
+ if (stream->metadata_flag) {
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
/*
abort();
}
}
-
- stream->rotate_position = 0;
- stream->rotate_ready = 0;
- stream->rotated = 1;
+ lttng_consumer_reset_stream_rotate_state(stream);
ret = 0;
- goto end;
error:
- ret = -1;
-end:
return ret;
}
int ret;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
+ struct lttng_pipe *stream_pipe;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
ret = -1;
goto end;
}
+
+ if (channel->metadata_stream) {
+ fprintf(stderr, "M\n");
+ stream_pipe = ctx->consumer_metadata_rotate_pipe;
+ } else {
+ fprintf(stderr, "D\n");
+ stream_pipe = ctx->consumer_data_rotate_pipe;
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
health_code_update();
- /*
- * Lock stream because we are about to change its state.
- */
- pthread_mutex_lock(&stream->lock);
if (stream->rotate_ready == 0) {
- pthread_mutex_unlock(&stream->lock);
continue;
}
- ret = lttng_consumer_rotate_stream(ctx, stream);
+ fprintf(stderr, "send stream %lu on wakeup pipe\n", stream->key);
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
- pthread_mutex_unlock(&stream->lock);
- ERR("Stream rotation error");
- goto end;
- }
-
- pthread_mutex_unlock(&stream->lock);
- ret = consumer_post_rotation(stream, ctx);
- if (ret < 0) {
- ERR("Failed after a rotation");
+ ERR("Failed to wakeup consumer rotate pipe");
goto end;
}
+ fprintf(stderr, "done sending stream %lu on wakeup pipe\n", stream->key);
}
ret = 0;
rcu_read_unlock();
return ret;
}
+
+static
+int rotate_rename_local(char *current_path, char *new_path,
+ uid_t uid, gid_t gid)
+{
+ int ret;
+
+ ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG,
+ uid, gid);
+ if (ret < 0) {
+ ERR("Create directory on rotate");
+ goto end;
+ }
+
+ ret = rename(current_path, new_path);
+ /*
+ * If a domain has not yet created its channel, the domain-specific
+ * folder might not exist, but this is not an error.
+ */
+ if (ret < 0 && errno != ENOENT) {
+ PERROR("Rename completed rotation chunk");
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static
+int rotate_rename_relay(char *current_path, char *new_path, uint64_t relayd_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ ret = relayd_rotate_rename(&relayd->control_sock, current_path, new_path);
+
+end:
+ return ret;
+
+}
+
+int lttng_consumer_rotate_rename(char *current_path, char *new_path,
+ uid_t uid, gid_t gid, uint64_t relayd_id)
+{
+ if (relayd_id != (uint64_t) -1ULL) {
+ return rotate_rename_relay(current_path, new_path, relayd_id);
+ } else {
+ return rotate_rename_local(current_path, new_path, uid, gid);
+ }
+}
+
+int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+ uint64_t relayd_id, uint64_t chunk_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+
+end:
+ return ret;
+
+}