From: Julien Desfossez Date: Wed, 26 Jul 2017 18:54:59 +0000 (-0400) Subject: refactor: put in common all consumer code X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=commitdiff_plain;h=6af2ba3b2621ead01efa98c6e2f76713d68cf5db refactor: put in common all consumer code Signed-off-by: Julien Desfossez --- diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index b362d022f..85e2f1a7d 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -6279,73 +6279,6 @@ int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess) return 0; } -static -int regenerate_per_pid_metadata(struct ltt_ust_session *usess, - struct ust_app *app, - struct ust_registry_session *registry) -{ - int ret; - struct ust_registry_channel *chan; - struct lttng_ht_iter iter_chan; - - pthread_mutex_lock(®istry->lock); - registry->metadata_len_sent = 0; - memset(registry->metadata, 0, registry->metadata_alloc_len); - registry->metadata_len = 0; - registry->metadata_version++; - - fprintf(stderr, "PER_PID REGEN %d\n", registry->metadata_fd); -#if 0 - if (registry->metadata_fd > 0) { - /* Clear the metadata file's content. */ - ret = clear_metadata_file(registry->metadata_fd); - if (ret) { - pthread_mutex_unlock(®istry->lock); - goto end; - } - } -#endif - - ret = ust_metadata_session_statedump(registry, app, - registry->major, registry->minor); - if (ret) { - pthread_mutex_unlock(®istry->lock); - ERR("Failed to generate session metadata (err = %d)", - ret); - goto end; - } - cds_lfht_for_each_entry(registry->channels->ht, &iter_chan.iter, - chan, node.node) { - struct ust_registry_event *event; - struct lttng_ht_iter iter_event; - - ret = ust_metadata_channel_statedump(registry, chan); - if (ret) { - pthread_mutex_unlock(®istry->lock); - ERR("Failed to generate channel metadata " - "(err = %d)", ret); - goto end; - } - cds_lfht_for_each_entry(chan->ht->ht, &iter_event.iter, - event, node.node) { - ret = ust_metadata_event_statedump(registry, - chan, event); - if (ret) { - pthread_mutex_unlock(®istry->lock); - ERR("Failed to generate event metadata " - "(err = %d)", ret); - goto end; - } - } - } - pthread_mutex_unlock(®istry->lock); - - ret = 0; - -end: - return ret; -} - /* * Rotate all the channels of a session. * diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index a8c369904..90e632dc3 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -541,6 +541,13 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) consumer_stream_destroy(stream, metadata_ht); } +void consumer_stream_copy_ro_channel_values(struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel) +{ + stream->channel_ro_tracefile_size = channel->tracefile_size; + memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -1958,6 +1965,25 @@ end: return written; } +/* + * Sample the snapshot positions for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_sample_snapshot_positions(stream); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_sample_snapshot_positions(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} /* * Take a snapshot for a specific fd * @@ -1999,6 +2025,27 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, } } +/* + * Get the consumed position + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_get_consumed_snapshot(stream, pos); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_get_consumed_snapshot(stream, pos); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -3779,3 +3826,320 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, } return start_pos; } + +static +int flush_buffer(struct lttng_consumer_stream *stream, int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustctl_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +/* + * Sample the rotate position for all the streams of a channel. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_channel(uint64_t key, char *path, + uint64_t relayd_id, uint32_t metadata, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + DBG("Kernel consumer sample rotate position for channel %" PRIu64, key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + pthread_mutex_lock(&channel->lock); + snprintf(channel->pathname, PATH_MAX, "%s", path); + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking kernel snapshot positions"); + goto end_unlock; + } else { + uint64_t consumed_pos; + + ret = lttng_consumer_get_produced_snapshot(stream, + &stream->rotate_position); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto end_unlock; + } + fprintf(stderr, "Stream %lu should rotate after %lu to %s\n", + stream->key, stream->rotate_position, + channel->pathname); + lttng_consumer_get_consumed_snapshot(stream, + &consumed_pos); + fprintf(stderr, "consumed %lu\n", consumed_pos); + if (consumed_pos == stream->rotate_position) { + stream->rotate_ready = 1; + fprintf(stderr, "Stream %lu ready to rotate to %s\n", + stream->key, channel->pathname); + } + } + channel->nr_stream_rotate_pending++; + + ret = flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush stream"); + goto end_unlock; + } + + pthread_mutex_unlock(&stream->lock); + } + + ret = 0; + goto end_unlock_channel; + +end_unlock: + pthread_mutex_unlock(&stream->lock); +end_unlock_channel: + pthread_mutex_unlock(&channel->lock); +end: + rcu_read_unlock(); + return ret; +} + +static +int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, + uint64_t key) +{ + int ret; + + do { + ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + PERROR("write to the channel rotate pipe"); + } else { + DBG("Sent channel rotation notification for channel key %" + PRIu64, key); + } + + return ret; +} + +/* + * Performs the stream rotation for the rotate session feature if needed. + * It must be called with the stream and channel locks held. + * + * Return 0 on success, a negative number of error. + */ +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + unsigned long consumed_pos; + + if (!stream->rotate_position && !stream->rotate_ready) { + ret = 0; + goto end; + } + + /* + * If we don't have the rotate_ready flag, check the consumed position + * to determine if we need to rotate. + */ + if (!stream->rotate_ready) { + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking kernel snapshot positions"); + goto error; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto error; + } + + fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos); + /* Rotate position not reached yet. */ + if (consumed_pos < stream->rotate_position) { + ret = 0; + goto end; + } + fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n", + consumed_pos, stream->rotate_position, stream->key); + } else { + fprintf(stderr, "Rotate position reached for stream %lu\n", + stream->key); + } + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing tracefile"); + goto error; + } + + fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key, + stream->chan->pathname, stream->name); + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, + stream->uid, stream->gid, NULL); + if (ret < 0) { + goto error; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + struct lttng_index_file *index_file; + + lttng_index_file_put(stream->index_file); + + index_file = lttng_index_file_create(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { + goto error; + } + stream->index_file = index_file; + stream->out_fd_offset = 0; + } else { + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the metadata + * cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + if (ret < 0) { + ERR("Failed to dump the metadata cache after rotation"); + goto error; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + /* + * Wakeup the metadata thread so it dumps the metadata cache + * to file again. + * FIXME: post-pone that after we have released the stream lock. + */ + consumer_metadata_wakeup_pipe(stream->chan); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + } + + stream->rotate_position = 0; + stream->rotate_ready = 0; + + if (--stream->chan->nr_stream_rotate_pending == 0) { + rotate_notify_sessiond(ctx, stream->chan->key); + fprintf(stderr, "SENT %lu\n", stream->chan->key); + } + + ret = 0; + goto end; + +error: + ret = -1; +end: + return ret; +} + +/* + * Rotate all the ready streams. + * + * This is especially important for low throughput streams that have already + * been consumed, we cannot wait for their next packet to perform the + * rotation. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_ready_streams(uint64_t key, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + if (stream->rotate_ready == 0) { + pthread_mutex_unlock(&stream->lock); + continue; + } + ret = lttng_consumer_rotate_stream(ctx, stream); + if (ret < 0) { + pthread_mutex_unlock(&stream->lock); + ERR("Stream rotation error"); + goto end; + } + + pthread_mutex_unlock(&stream->lock); + } + + ret = 0; + +end: + rcu_read_unlock(); + return ret; +} diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index d54520eca..77256c146 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -429,6 +429,15 @@ struct lttng_consumer_stream { */ uint64_t rotate_position; + /* + * Read-only copies of channel values. We cannot safely access the + * channel from a stream, so we need to have a local copy of these + * fields in the stream object. These fields should be removed from + * the stream objects when we introduce refcounting. + */ + char channel_ro_pathname[PATH_MAX]; + uint64_t channel_ro_tracefile_size; + /* * If rotate_ready is set to 1, rotate the stream the next time data * need to be extracted, regardless of the rotate_position. This is @@ -690,6 +699,13 @@ void lttng_consumer_cleanup(void); */ int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); +/* + * Copy the fields from the channel that need to be accessed in read-only + * directly from the stream. + */ +void consumer_stream_copy_ro_channel_values(struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel); + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -753,9 +769,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, struct ctf_packet_index *index); +int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos); int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); @@ -789,5 +808,12 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); int consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); +int lttng_consumer_rotate_channel(uint64_t key, char *path, + uint64_t relayd_id, uint32_t metadata, + struct lttng_consumer_local_data *ctx); +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream); +int lttng_consumer_rotate_ready_streams(uint64_t key, + struct lttng_consumer_local_data *ctx); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 401f35940..89be8de0a 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -424,283 +424,6 @@ error: return ret; } -/* - * When a channel has finished the rotation of all its streams, inform the - * session daemon. - */ -static -int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, - uint64_t key) -{ - int ret; - - do { - ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - PERROR("write to the channel rotate pipe"); - } else { - DBG("Sent channel rotation notification for channel key %" - PRIu64, key); - } - - return ret; -} - -/* - * Performs the stream rotation for the rotate session feature if needed. - * It must be called with the stream and channel locks held. - * - * Return 0 on success, a negative number of error. - */ -static -int stream_rotation(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) -{ - int ret; - unsigned long consumed_pos; - - if (!stream->rotate_position && !stream->rotate_ready) { - ret = 0; - goto end; - } - - /* - * If we don't have the rotate_ready flag, check the consumed position - * to determine if we need to rotate. - */ - if (!stream->rotate_ready) { - ret = lttng_kconsumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Taking kernel snapshot positions"); - goto error; - } - - ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); - if (ret < 0) { - ERR("Produced kernel snapshot position"); - goto error; - } - - fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos); - /* Rotate position not reached yet. */ - if (consumed_pos < stream->rotate_position) { - ret = 0; - goto end; - } - fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n", - consumed_pos, stream->rotate_position, stream->key); - } else { - fprintf(stderr, "Rotate position reached for stream %lu\n", - stream->key); - } - - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Closing tracefile"); - goto error; - } - - fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key, - stream->chan->pathname, stream->name); - ret = utils_create_stream_file(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { - goto error; - } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; - - lttng_index_file_put(stream->index_file); - - index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - goto error; - } - stream->index_file = index_file; - stream->out_fd_offset = 0; - } else { - /* - * Reset the position of what has been read from the metadata - * cache to 0 so we can dump it again. - */ - ret = kernctl_metadata_cache_dump(stream->wait_fd); - if (ret < 0) { - ERR("Failed to dump the metadata cache after rotation"); - goto error; - } - } - - stream->rotate_position = 0; - stream->rotate_ready = 0; - - if (--stream->chan->nr_stream_rotate_pending == 0) { - rotate_notify_sessiond(ctx, stream->chan->key); - fprintf(stderr, "SENT %lu\n", stream->chan->key); - } - - ret = 0; - goto end; - -error: - ret = -1; -end: - return ret; -} - -/* - * Sample the rotate position for all the streams of a channel. - * - * Returns 0 on success, < 0 on error - */ -static -int lttng_kconsumer_rotate_channel(uint64_t key, char *path, - uint64_t relayd_id, uint32_t metadata, - struct lttng_consumer_local_data *ctx) -{ - int ret; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; - - DBG("Kernel consumer sample rotate position for channel %" PRIu64, key); - - rcu_read_lock(); - - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - pthread_mutex_lock(&channel->lock); - snprintf(channel->pathname, PATH_MAX, "%s", path); - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - health_code_update(); - - /* - * Lock stream because we are about to change its state. - */ - pthread_mutex_lock(&stream->lock); - ret = lttng_kconsumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Taking kernel snapshot positions"); - goto end_unlock; - } else { - uint64_t consumed_pos; - - ret = lttng_kconsumer_get_produced_snapshot(stream, - &stream->rotate_position); - if (ret < 0) { - ERR("Produced kernel snapshot position"); - goto end_unlock; - } - fprintf(stderr, "Stream %lu should rotate after %lu to %s\n", - stream->key, stream->rotate_position, - channel->pathname); - lttng_kconsumer_get_consumed_snapshot(stream, - &consumed_pos); - fprintf(stderr, "consumed %lu\n", consumed_pos); - if (consumed_pos == stream->rotate_position) { - stream->rotate_ready = 1; - fprintf(stderr, "Stream %lu ready to rotate to %s\n", - stream->key, channel->pathname); - } - } - channel->nr_stream_rotate_pending++; - - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end_unlock; - } - - pthread_mutex_unlock(&stream->lock); - } - - ret = 0; - goto end_unlock_channel; - -end_unlock: - pthread_mutex_unlock(&stream->lock); -end_unlock_channel: - pthread_mutex_unlock(&channel->lock); -end: - rcu_read_unlock(); - return ret; -} - -/* - * Rotate all the ready streams. - * - * This is especially important for low throughput streams that have already - * been consumed, we cannot wait for their next packet to perform the - * rotation. - * - * Returns 0 on success, < 0 on error - */ -static -int lttng_kconsumer_rotate_ready_streams(uint64_t key, - struct lttng_consumer_local_data *ctx) -{ - int ret; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; - - rcu_read_lock(); - - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - health_code_update(); - - /* - * Lock stream because we are about to change its state. - */ - pthread_mutex_lock(&stream->lock); - if (stream->rotate_ready == 0) { - pthread_mutex_unlock(&stream->lock); - continue; - } - ret = stream_rotation(ctx, stream); - if (ret < 0) { - pthread_mutex_unlock(&stream->lock); - ERR("Stream rotation error"); - goto end; - } - - pthread_mutex_unlock(&stream->lock); - } - - ret = 0; - -end: - rcu_read_unlock(); - return ret; -} - /* * Receive command from session daemon and process it. * @@ -935,6 +658,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->chan = channel; new_stream->wait_fd = fd; + consumer_stream_copy_ro_channel_values(new_stream, channel); switch (channel->output) { case CONSUMER_CHANNEL_SPLICE: new_stream->output = LTTNG_EVENT_SPLICE; @@ -1410,7 +1134,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ROTATE_CHANNEL: { - ret = lttng_kconsumer_rotate_channel(msg.u.rotate_channel.key, + ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key, msg.u.rotate_channel.pathname, msg.u.rotate_channel.relayd_id, msg.u.rotate_channel.metadata, @@ -1435,7 +1159,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * handle this, but it needs to be after the * consumer_send_status_msg() call. */ - ret = lttng_kconsumer_rotate_ready_streams( + ret = lttng_consumer_rotate_ready_streams( msg.u.rotate_channel.key, ctx); if (ret < 0) { ERR("Rotate channel failed"); @@ -1903,7 +1627,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, end: pthread_mutex_lock(&stream->chan->lock); - rotation_ret = stream_rotation(ctx, stream); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { pthread_mutex_unlock(&stream->chan->lock); ERR("Stream rotation error"); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b5145cdb8..668de5775 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -183,7 +183,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, } goto error; } - + consumer_stream_copy_ro_channel_values(stream, channel); stream->chan = channel; error: @@ -1213,283 +1213,6 @@ error: return ret; } -/* - * When a channel has finished the rotation of all its streams, inform the - * session daemon. - */ -static -int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, - uint64_t key) -{ - int ret; - - do { - ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - PERROR("write to the channel rotate pipe"); - } else { - DBG("Sent channel rotation notification for channel key %" - PRIu64, key); - } - - return ret; -} - -/* - * Performs the stream rotation for the rotate session feature if needed. - * It must be called with the stream and channel locks held. - * - * FIXME: find a way to lock the chan without deadlock. same for kernel. - * - * Return 0 on success, a negative number of error. - */ -static -int stream_rotation(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) -{ - int ret; - unsigned long consumed_pos; - - if (!stream->rotate_position && !stream->rotate_ready) { - ret = 0; - goto end; - } - - /* - * If we don't have the rotate_ready flag, check the consumed position - * to determine if we need to rotate. - */ - if (!stream->rotate_ready) { - ret = lttng_ustconsumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Taking UST snapshot positions"); - goto error; - } - - ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); - if (ret < 0) { - ERR("Produced UST snapshot position"); - goto error; - } - - fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos); - /* Rotate position not reached yet. */ - if (consumed_pos < stream->rotate_position) { - ret = 0; - goto end; - } - fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n", - consumed_pos, stream->rotate_position, stream->key); - } else { - fprintf(stderr, "Rotate position reached for stream %lu\n", - stream->key); - } - - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Closing tracefile"); - goto error; - } - - fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key, - stream->chan->pathname, stream->name); - ret = utils_create_stream_file(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { - goto error; - } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; - - lttng_index_file_put(stream->index_file); - - index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - goto error; - } - stream->index_file = index_file; - stream->out_fd_offset = 0; - } else { - /* - * Reset the position pushed from the metadata cache so it - * will write from the beginning on the next push. - */ - stream->ust_metadata_pushed = 0; - /* - * Wakeup the metadata thread so it dumps the metadata cache - * to file again. - */ - consumer_metadata_wakeup_pipe(stream->chan); - } - - stream->rotate_position = 0; - stream->rotate_ready = 0; - - if (--stream->chan->nr_stream_rotate_pending == 0) { - rotate_notify_sessiond(ctx, stream->chan->key); - fprintf(stderr, "SENT %lu\n", stream->chan->key); - } - - ret = 0; - goto end; - -error: - ret = -1; -end: - return ret; -} - -/* - * Sample the rotate position for all the streams of a channel. - * - * Returns 0 on success, < 0 on error - */ -static -int lttng_ustconsumer_rotate_channel(uint64_t key, char *path, - uint64_t relayd_id, uint32_t metadata, - struct lttng_consumer_local_data *ctx) -{ - int ret; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; - - /* FIXME: metadata param is useless */ - - DBG("UST consumer sample rotate position for channel %" PRIu64, key); - - rcu_read_lock(); - - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - pthread_mutex_lock(&channel->lock); - snprintf(channel->pathname, PATH_MAX, "%s", path); - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - uint64_t consumed_pos; - health_code_update(); - - /* - * Lock stream because we are about to change its state. - */ - pthread_mutex_lock(&stream->lock); - ret = lttng_ustconsumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Taking UST snapshot positions"); - goto end_unlock; - } - - ret = lttng_ustconsumer_get_produced_snapshot(stream, - &stream->rotate_position); - if (ret < 0) { - ERR("Produced UST snapshot position"); - goto end_unlock; - } - fprintf(stderr, "Stream %lu should rotate after %lu to %s\n", - stream->key, stream->rotate_position, - channel->pathname); - lttng_ustconsumer_get_consumed_snapshot(stream, - &consumed_pos); - fprintf(stderr, "consumed %lu\n", consumed_pos); - if (consumed_pos == stream->rotate_position) { - stream->rotate_ready = 1; - fprintf(stderr, "Stream %lu ready to rotate to %s\n", - stream->key, channel->pathname); - } - channel->nr_stream_rotate_pending++; - - ustctl_flush_buffer(stream->ustream, 1); - - pthread_mutex_unlock(&stream->lock); - } - - ret = 0; - goto end_unlock_channel; - -end_unlock: - pthread_mutex_unlock(&stream->lock); -end_unlock_channel: - pthread_mutex_unlock(&channel->lock); -end: - rcu_read_unlock(); - return ret; -} - -/* - * Rotate all the ready streams. - * - * This is especially important for low throughput streams that have already - * been consumed, we cannot wait for their next packet to perform the - * rotation. - * - * Returns 0 on success, < 0 on error - */ -static -int lttng_ustconsumer_rotate_ready_streams(uint64_t key, - struct lttng_consumer_local_data *ctx) -{ - int ret; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; - - rcu_read_lock(); - - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - health_code_update(); - - /* - * Lock stream because we are about to change its state. - */ - pthread_mutex_lock(&stream->lock); - if (stream->rotate_ready == 0) { - pthread_mutex_unlock(&stream->lock); - continue; - } - ret = stream_rotation(ctx, stream); - if (ret < 0) { - pthread_mutex_unlock(&stream->lock); - ERR("Stream rotation error"); - goto end; - } - - pthread_mutex_unlock(&stream->lock); - } - - ret = 0; - -end: - rcu_read_unlock(); - return ret; -} - /* * Receive the metadata updates from the sessiond. Supports receiving * overlapping metadata, but is needs to always belong to a contiguous @@ -2209,7 +1932,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ROTATE_CHANNEL: { - ret = lttng_ustconsumer_rotate_channel(msg.u.rotate_channel.key, + ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key, msg.u.rotate_channel.pathname, msg.u.rotate_channel.relayd_id, msg.u.rotate_channel.metadata, @@ -2234,7 +1957,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * handle this, but it needs to be after the * consumer_send_status_msg() call. */ - ret = lttng_ustconsumer_rotate_ready_streams( + ret = lttng_consumer_rotate_ready_streams( msg.u.rotate_channel.key, ctx); if (ret < 0) { ERR("Rotate channel failed"); @@ -2322,6 +2045,15 @@ void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) return ustctl_get_mmap_base(stream->ustream); } +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) +{ + assert(stream); + assert(stream->ustream); + + ustctl_flush_buffer(stream->ustream, producer_active); +} + /* * Take a snapshot for a specific stream. * @@ -3000,7 +2732,7 @@ end: /* FIXME: do we need this lock, it causes deadlocks when called * at the same time with lttng_ustconsumer_rotate_channel ? */ // pthread_mutex_lock(&stream->chan->lock); - rotation_ret = stream_rotation(ctx, stream); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { // pthread_mutex_unlock(&stream->chan->lock); ret = -1; diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index b5ff16186..7603b3581 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -52,6 +52,8 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream); int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream, unsigned long *off); void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream); +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active); int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id); int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream); @@ -188,6 +190,12 @@ void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) return NULL; } static inline +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) +{ + return; +} +static inline void lttng_ustconsumer_close_all_metadata(struct lttng_ht *ht) { }