X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=27e96007c920a94ef94aaae8fefa21d7aa771ab8;hp=820427aa99ed168d24f30cb28856fa87a6a4f950;hb=d9f0c7c7ce520c309f8e7148ecaff9aacbe52b55;hpb=00fb02ace5151a6546f4e97e5439512913a50e68 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 820427aa9..27e96007c 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -464,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -491,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -541,6 +539,16 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) consumer_stream_destroy(stream, metadata_ht); } +void consumer_stream_update_channel_attributes( + struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel) +{ + stream->channel_read_only_attributes.tracefile_size = + channel->tracefile_size; + memcpy(stream->channel_read_only_attributes.path, channel->pathname, + sizeof(stream->channel_read_only_attributes.path)); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -552,7 +560,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttng_consumer_stream *stream; @@ -579,6 +588,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -796,9 +806,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); + stream->chan->tracefile_size, stream->chan->tracefile_count, + stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -840,6 +853,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { @@ -1465,7 +1480,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd, unsigned long padding) + unsigned long padding) { ssize_t ret; struct lttcomm_relayd_metadata_payload hdr; @@ -1600,7 +1615,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + ret = write_relayd_metadata_id(outfd, stream, padding); if (ret < 0) { relayd_hang_up = 1; goto write_error; @@ -1707,7 +1722,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1789,7 +1805,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } stream->reset_metadata_flag = 0; } - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + ret = write_relayd_metadata_id(splice_pipe[1], stream, padding); if (ret < 0) { written = ret; @@ -1933,7 +1949,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -1961,6 +1978,25 @@ end: return written; } +/* + * Sample the snapshot positions for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_sample_snapshot_positions(stream); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_sample_snapshot_positions(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} /* * Take a snapshot for a specific fd * @@ -2002,6 +2038,27 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, } } +/* + * Get the consumed position (free-running counter position in bytes). + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_get_consumed_snapshot(stream, pos); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_get_consumed_snapshot(stream, pos); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -2226,6 +2283,72 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } +static +int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, + uint64_t key) +{ + ssize_t ret; + + do { + ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + PERROR("Failed to write to the channel rotation pipe"); + } else { + DBG("Sent channel rotation notification for channel key %" + PRIu64, key); + ret = 0; + } + + return (int) ret; +} + +/* + * Perform operations that need to be done after a stream has + * rotated and released the stream lock. + * + * Multiple rotations cannot occur simultaneously, so we know the state of the + * "rotated" stream flag cannot change. + * + * This MUST be called WITHOUT the stream lock held. + */ +static +int consumer_post_rotation(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + pthread_mutex_lock(&stream->chan->lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * The ust_metadata_pushed counter has been reset to 0, so now + * we can wakeup the metadata thread so it dumps the metadata + * cache to the new file. + */ + if (stream->metadata_flag) { + consumer_metadata_wakeup_pipe(stream->chan); + } + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + + if (--stream->chan->nr_stream_rotate_pending == 0) { + DBG("Rotation of channel \"%s\" completed, notifying the session daemon", + stream->chan->name); + ret = rotate_notify_sessiond(ctx, stream->chan->key); + } + pthread_mutex_unlock(&stream->chan->lock); + + return ret; +} + /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -2458,6 +2581,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* 2 for the consumer_data_pipe and wake up pipe */ + const int nb_pipes_fd = 2; /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; @@ -2497,18 +2622,15 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* - * Allocate for all fds +1 for the consumer_data_pipe and +1 for - * wake up pipe. - */ - pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); + /* Allocate for all fds */ + pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - local_stream = zmalloc((consumer_data.stream_count + 2) * + local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2536,12 +2658,12 @@ void *consumer_thread_data_poll(void *data) } /* poll on the array of fds */ restart: - DBG("polling on %d fd", nb_fd + 2); + DBG("polling on %d fd", nb_fd + nb_pipes_fd); if (testpoint(consumerd_thread_data_poll)) { goto end; } health_poll_entry(); - num_rdy = poll(pollfd, nb_fd + 2, -1); + num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1); health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { @@ -3273,6 +3395,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotate_ret; + bool rotated = false; pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { @@ -3281,11 +3405,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); + ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); break; default: ERR("Unknown consumer_data type"); @@ -3299,6 +3423,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); + if (rotated) { + rotate_ret = consumer_post_rotation(stream, ctx); + if (rotate_ret < 0) { + ERR("Failed after a rotation"); + ret = -1; + } + } + return ret; } @@ -3517,6 +3649,7 @@ error: * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ @@ -3540,34 +3673,6 @@ error_nosignal: } } -/* - * Try to lock the stream mutex. - * - * On success, 1 is returned else 0 indicating that the mutex is NOT lock. - */ -static int stream_try_lock(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret) { - /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ - ret = 0; - goto end; - } - - ret = 1; - -end: - return ret; -} - /* * Search for a relayd associated to the session id and return the reference. * @@ -3634,28 +3739,11 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - goto data_not_pending; - } - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { - /* If this call fails, the stream is being used hence data pending. */ - ret = stream_try_lock(stream); - if (!ret) { - goto data_pending; - } + pthread_mutex_lock(&stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3673,9 +3761,27 @@ int consumer_data_pending(uint64_t id) } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3684,24 +3790,25 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret == 1) { - pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_pending; + } else if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + goto data_not_pending; } } - pthread_mutex_unlock(&stream->lock); - } - if (relayd) { - unsigned int is_data_inflight = 0; - - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { @@ -3784,6 +3891,429 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } +static +int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustctl_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +/* + * Sample the rotate position for all the streams of a channel. If a stream + * is already at the rotate position (produced == consumed), we flag it as + * ready for rotation. The rotation of ready streams occurs after we have + * replied to the session daemon that we have finished sampling the positions. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_channel(uint64_t key, const char *path, + uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + DBG("Consumer sample rotate position for channel %" PRIu64, key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + pthread_mutex_lock(&channel->lock); + channel->current_chunk_id = new_chunk_id; + + ret = lttng_strncpy(channel->pathname, path, sizeof(channel->pathname)); + if (ret) { + ERR("Failed to copy new path to channel during channel rotation"); + ret = -1; + goto end_unlock_channel; + } + + if (relayd_id == -1ULL) { + /* + * The domain path (/ust or /kernel) has been created before, we + * now need to create the last part of the path: the application/user + * specific section (uid/1000/64-bit). + */ + ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG, + channel->uid, channel->gid); + if (ret < 0) { + ERR("Failed to create trace directory at %s during rotation", + channel->pathname); + ret = -1; + goto end_unlock_channel; + } + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + unsigned long consumed_pos; + + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + ret = lttng_strncpy(stream->channel_read_only_attributes.path, + channel->pathname, + sizeof(stream->channel_read_only_attributes.path)); + if (ret) { + ERR("Failed to sample channel path name during channel rotation"); + goto end_unlock_stream; + } + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to sample snapshot position during channel rotation"); + goto end_unlock_stream; + } + + ret = lttng_consumer_get_produced_snapshot(stream, + &stream->rotate_position); + if (ret < 0) { + ERR("Failed to sample produced position during channel rotation"); + goto end_unlock_stream; + } + + lttng_consumer_get_consumed_snapshot(stream, + &consumed_pos); + if (consumed_pos == stream->rotate_position) { + stream->rotate_ready = true; + } + channel->nr_stream_rotate_pending++; + + ret = consumer_flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush stream %" PRIu64 " during channel rotation", + stream->key); + goto end_unlock_stream; + } + + pthread_mutex_unlock(&stream->lock); + } + pthread_mutex_unlock(&channel->lock); + + ret = 0; + goto end; + +end_unlock_stream: + pthread_mutex_unlock(&stream->lock); +end_unlock_channel: + pthread_mutex_unlock(&channel->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Check if a stream is ready to be rotated after extracting it. + * + * Return 1 if it is ready for rotation, 0 if it is not, a negative value on + * error. Stream lock must be held. + */ +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) +{ + int ret; + unsigned long consumed_pos; + + if (!stream->rotate_position && !stream->rotate_ready) { + ret = 0; + goto end; + } + + if (stream->rotate_ready) { + ret = 1; + goto end; + } + + /* + * If we don't have the rotate_ready flag, check the consumed position + * to determine if we need to rotate. + */ + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking snapshot positions"); + goto end; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumed snapshot position"); + goto end; + } + + /* Rotate position not reached yet (with check for overflow). */ + if ((long) (consumed_pos - stream->rotate_position) < 0) { + ret = 0; + goto end; + } + ret = 1; + +end: + return ret; +} + +/* + * Reset the state for a stream after a rotation occurred. + */ +void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream) +{ + stream->rotate_position = 0; + stream->rotate_ready = false; +} + +/* + * Perform the rotation a local stream file. + */ +int rotate_local_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + + DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s", + stream->key, + stream->chan->key, + stream->channel_read_only_attributes.path); + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing trace file (fd %d), stream %" PRIu64, + stream->out_fd, stream->key); + assert(0); + goto error; + } + + ret = utils_create_stream_file( + stream->channel_read_only_attributes.path, + stream->name, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + stream->uid, stream->gid, NULL); + if (ret < 0) { + ERR("Rotate create stream file"); + goto error; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + struct lttng_index_file *index_file; + + lttng_index_file_put(stream->index_file); + + index_file = lttng_index_file_create( + stream->channel_read_only_attributes.path, + stream->name, stream->uid, stream->gid, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { + ERR("Create index file during rotation"); + goto error; + } + stream->index_file = index_file; + stream->out_fd_offset = 0; + } + + ret = 0; + goto end; + +error: + ret = -1; +end: + return ret; + +} + +/* + * Perform the rotation a stream file on the relay. + */ +int rotate_relay_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + DBG("Rotate relay stream"); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->channel_read_only_attributes.path, + stream->chan->current_chunk_id, + stream->last_sequence_number); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } + if (ret) { + ERR("Rotate relay stream"); + } + +end: + return ret; +} + +/* + * Performs the stream rotation for the rotate session feature if needed. + * It must be called with the stream lock held. + * + * Return 0 on success, a negative number of error. + */ +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, bool *rotated) +{ + int ret; + + DBG("Consumer rotate stream %" PRIu64, stream->key); + + if (stream->net_seq_idx != (uint64_t) -1ULL) { + ret = rotate_relay_stream(ctx, stream); + } else { + ret = rotate_local_stream(ctx, stream); + } + if (ret < 0) { + ERR("Rotate stream"); + goto error; + } + + if (stream->metadata_flag) { + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the metadata + * cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + if (ret < 0) { + ERR("Failed to dump the kernel metadata cache after rotation"); + goto error; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + } + lttng_consumer_reset_stream_rotate_state(stream); + + if (rotated) { + *rotated = true; + } + + ret = 0; + +error: + return ret; +} + +/* + * Rotate all the ready streams now. + * + * This is especially important for low throughput streams that have already + * been consumed, we cannot wait for their next packet to perform the + * rotation. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_ready_streams(uint64_t key, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + DBG("Consumer rotate ready streams in channel %" PRIu64, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + health_code_update(); + + pthread_mutex_lock(&stream->lock); + + if (!stream->rotate_ready) { + pthread_mutex_unlock(&stream->lock); + continue; + } + DBG("Consumer rotate ready stream %" PRIu64, stream->key); + + ret = lttng_consumer_rotate_stream(ctx, stream, NULL); + pthread_mutex_unlock(&stream->lock); + if (ret) { + goto end; + } + + ret = consumer_post_rotation(stream, ctx); + if (ret) { + goto end; + } + } + + ret = 0; + +end: + rcu_read_unlock(); + return ret; +} + static int rotate_rename_local(const char *old_path, const char *new_path, uid_t uid, gid_t gid) @@ -3826,6 +4356,10 @@ int rotate_rename_relay(const char *old_path, const char *new_path, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path); + if (ret < 0) { + ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: return ret; @@ -3841,6 +4375,31 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, } } +int lttng_consumer_rotate_pending_relay(uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + if (ret < 0) { + ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; +} + static int mkdir_local(const char *path, uid_t uid, gid_t gid) { @@ -3872,6 +4431,10 @@ int mkdir_relay(const char *path, uint64_t relayd_id) pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_mkdir(&relayd->control_sock, path); + if (ret < 0) { + ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: