X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=cd9b273c28f32f36be201df68263100a11c7b7bb;hp=6d6690a32161a4cbc24e05157f31407fe9c1df74;hb=2463b7879c00298daa79744cdaae82ac061a4ed8;hpb=f5ba75b4f0c0b44092c76bc931b25b24a2e62718 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6d6690a32..cd9b273c2 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include "ust-consumer.h" @@ -78,6 +79,11 @@ static void destroy_channel(struct lttng_consumer_channel *channel) lttng_ustconsumer_del_channel(channel); lttng_ustconsumer_free_channel(channel); } + + if (channel->trace_chunk) { + lttng_trace_chunk_put(channel->trace_chunk); + } + free(channel); } @@ -347,48 +353,6 @@ error_alloc: return ret; } -/* - * create_posix_shm is never called concurrently within a process. - */ -static -int create_posix_shm(void) -{ - char tmp_name[NAME_MAX]; - int shmfd, ret; - - ret = snprintf(tmp_name, NAME_MAX, "/ust-shm-consumer-%d", getpid()); - if (ret < 0) { - PERROR("snprintf"); - return -1; - } - /* - * Allocate shm, and immediately unlink its shm oject, keeping - * only the file descriptor as a reference to the object. - * We specifically do _not_ use the / at the beginning of the - * pathname so that some OS implementations can keep it local to - * the process (POSIX leaves this implementation-defined). - */ - shmfd = shm_open(tmp_name, O_CREAT | O_EXCL | O_RDWR, 0700); - if (shmfd < 0) { - PERROR("shm_open"); - goto error_shm_open; - } - ret = shm_unlink(tmp_name); - if (ret < 0 && errno != ENOENT) { - PERROR("shm_unlink"); - goto error_shm_release; - } - return shmfd; - -error_shm_release: - ret = close(shmfd); - if (ret) { - PERROR("close"); - } -error_shm_open: - return -1; -} - static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, const struct lttng_credentials *session_credentials) { @@ -396,7 +360,7 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, int ret; if (!channel->shm_path[0]) { - return create_posix_shm(); + return shm_create_anonymous("ust-consumer"); } ret = get_stream_shm_path(shm_path, channel->shm_path, cpu); if (ret) { @@ -404,7 +368,8 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, } return run_as_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - session_credentials->uid, session_credentials->gid); + lttng_credentials_get_uid(session_credentials), + lttng_credentials_get_gid(session_credentials)); error_shm_path: return -1; @@ -482,8 +447,10 @@ error_open: ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials))); if (closeret) { PERROR("unlink %s", shm_path); } @@ -492,8 +459,10 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); @@ -1456,8 +1425,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct ustctl_consumer_channel_attr attr; const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value; const struct lttng_credentials buffer_credentials = { - .uid = msg.u.ask_channel.buffer_credentials.uid, - .gid = msg.u.ask_channel.buffer_credentials.gid, + .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.gid), }; /* Create a plain object and reserve a channel key. */ @@ -2067,8 +2036,8 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = msg.u.create_trace_chunk.credentials.value.uid, - .gid = msg.u.create_trace_chunk.credentials.value.gid, + .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), }; const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; @@ -2176,6 +2145,28 @@ end_rotate_channel_nosignal: msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + /* + * The channel could have disappeared in per-pid + * buffering mode. + */ + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } default: break; } @@ -2375,8 +2366,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ERR("Cannot get stream shm path"); } ret = run_as_unlink(shm_path, - chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials))); if (ret) { PERROR("unlink %s", shm_path); } @@ -2395,8 +2388,10 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, - chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); @@ -2430,20 +2425,19 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) } static -void metadata_stream_reset_cache(struct lttng_consumer_stream *stream) +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) { DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; - stream->metadata_version = stream->chan->metadata_cache->version; - stream->reset_metadata_flag = 1; } /* * Write up to one packet from the metadata cache to the channel. * - * Returns the number of bytes pushed in the cache, or a negative value - * on error. + * Returns the number of bytes pushed from the cache into the ring buffer, or a + * negative value on error. */ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) @@ -2452,10 +2446,41 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->max_offset - == stream->ust_metadata_pushed) { - ret = 0; - goto end; + if (stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed) { + /* + * In the context of a user space metadata channel, a + * change in version can be detected in two ways: + * 1) During the pre-consume of the `read_subbuffer` loop, + * 2) When populating the metadata ring buffer (i.e. here). + * + * This function is invoked when there is no metadata + * available in the ring-buffer. If all data was consumed + * up to the size of the metadata cache, there is no metadata + * to insert in the ring-buffer. + * + * However, the metadata version could still have changed (a + * regeneration without any new data will yield the same cache + * size). + * + * The cache's version is checked for a version change and the + * consumed position is reset if one occurred. + * + * This check is only necessary for the user space domain as + * it has to manage the cache explicitly. If this reset was not + * performed, no metadata would be consumed (and no reset would + * occur as part of the pre-consume) until the metadata size + * exceeded the cache size. + */ + if (stream->metadata_version != + stream->chan->metadata_cache->version) { + metadata_stream_reset_cache_consumed_position(stream); + consumer_stream_metadata_set_version(stream, + stream->chan->metadata_cache->version); + } else { + ret = 0; + goto end; + } } write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, @@ -2496,15 +2521,13 @@ end: * awaiting on metadata to be pushed out. * * The RCU read side lock must be held by the caller. - * - * Return 0 if new metadatda is available, EAGAIN if the metadata stream - * is empty or a negative value on error. */ -int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, +enum sync_metadata_status lttng_ustconsumer_sync_metadata( + struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *metadata_stream) { int ret; - int retry = 0; + enum sync_metadata_status status; struct lttng_consumer_channel *metadata_channel; assert(ctx); @@ -2519,6 +2542,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { + status = SYNC_METADATA_STATUS_ERROR; goto end; } @@ -2536,38 +2560,30 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, if (consumer_stream_is_deleted(metadata_stream)) { DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", metadata_stream->key); - ret = 0; + status = SYNC_METADATA_STATUS_NO_DATA; goto end; } ret = commit_one_metadata_packet(metadata_stream); - if (ret <= 0) { + if (ret < 0) { + status = SYNC_METADATA_STATUS_ERROR; goto end; } else if (ret > 0) { - retry = 1; + status = SYNC_METADATA_STATUS_NEW_DATA; + } else /* ret == 0 */ { + status = SYNC_METADATA_STATUS_NO_DATA; + goto end; } ret = ustctl_snapshot(metadata_stream->ustream); if (ret < 0) { - if (errno != EAGAIN) { - ERR("Sync metadata, taking UST snapshot"); - goto end; - } - DBG("No new metadata when syncing them."); - /* No new metadata, exit. */ - ret = ENODATA; + ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret); + status = SYNC_METADATA_STATUS_ERROR; goto end; } - /* - * After this flush, we still need to extract metadata. - */ - if (retry) { - ret = EAGAIN; - } - end: - return ret; + return status; } /* @@ -2680,7 +2696,7 @@ static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - subbuf->info.metadata.version = stream->chan->metadata_cache->version; + subbuf->info.metadata.version = stream->metadata_version; end: return ret; @@ -2924,7 +2940,7 @@ static int lttng_ustconsumer_set_stream_ops( stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; ret = consumer_stream_enable_metadata_bucketization(