X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=0318eecea05600ded402806ef24d6d32f7a7b23e;hb=ace0e591600a76d76d54edef87ffb24afff6d209;hp=c7f208b8dfe3006e89751f6416cf185216efdcd0;hpb=e0547b835d76e0a3a2c3082d7c10e6902a1f3e04;p=deliverable%2Flttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index c7f208b8d..0318eecea 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -16,6 +16,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include #define _LGPL_SOURCE #include #include @@ -553,7 +554,7 @@ static int send_sessiond_channel(int sock, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; - uint64_t net_seq_idx = -1ULL; + uint64_t relayd_id = -1ULL; assert(channel); assert(ctx); @@ -578,8 +579,8 @@ static int send_sessiond_channel(int sock, } ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } - if (net_seq_idx == -1ULL) { - net_seq_idx = stream->net_seq_idx; + if (relayd_id == -1ULL) { + relayd_id = stream->relayd_id; } } } @@ -768,10 +769,19 @@ static int flush_channel(uint64_t chan_key) health_code_update(); pthread_mutex_lock(&stream->lock); + + /* + * Protect against concurrent teardown of a stream. + */ + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + if (!stream->quiescent) { ustctl_flush_buffer(stream->ustream, 0); stream->quiescent = true; } +next: pthread_mutex_unlock(&stream->lock); } error: @@ -831,6 +841,7 @@ static int close_metadata(uint64_t chan_key) { int ret = 0; struct lttng_consumer_channel *channel; + unsigned int channel_monitor; DBG("UST consumer close metadata key %" PRIu64, chan_key); @@ -849,13 +860,48 @@ static int close_metadata(uint64_t chan_key) pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - + channel_monitor = channel->monitor; if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; } lttng_ustconsumer_close_metadata(channel); + pthread_mutex_unlock(&channel->lock); + pthread_mutex_unlock(&consumer_data.lock); + + /* + * The ownership of a metadata channel depends on the type of + * session to which it belongs. In effect, the monitor flag is checked + * to determine if this metadata channel is in "snapshot" mode or not. + * + * In the non-snapshot case, the metadata channel is created along with + * a single stream which will remain present until the metadata channel + * is destroyed (on the destruction of its session). In this case, the + * metadata stream in "monitored" by the metadata poll thread and holds + * the ownership of its channel. + * + * Closing the metadata will cause the metadata stream's "metadata poll + * pipe" to be closed. Closing this pipe will wake-up the metadata poll + * thread which will teardown the metadata stream which, in return, + * deletes the metadata channel. + * + * In the snapshot case, the metadata stream is created and destroyed + * on every snapshot record. Since the channel doesn't have an owner + * other than the session daemon, it is safe to destroy it immediately + * on reception of the CLOSE_METADATA command. + */ + if (!channel_monitor) { + /* + * The channel and consumer_data locks must be + * released before this call since consumer_del_channel + * re-acquires the channel and consumer_data locks to teardown + * the channel and queue its reclamation by the "call_rcu" + * worker thread. + */ + consumer_del_channel(channel); + } + return ret; error_unlock: pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -903,7 +949,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) } /* Send metadata stream to relayd if needed. */ - if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { + if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname); if (ret < 0) { @@ -911,7 +957,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) goto error; } ret = consumer_send_relayd_streams_sent( - metadata->metadata_stream->net_seq_idx); + metadata->metadata_stream->relayd_id); if (ret < 0) { ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; @@ -1003,7 +1049,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, assert(metadata_stream); if (relayd_id != (uint64_t) -1ULL) { - metadata_stream->net_seq_idx = relayd_id; + metadata_stream->relayd_id = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); if (ret < 0) { goto error_stream; @@ -1043,6 +1089,35 @@ error: return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base; + + mmap_base = ustctl_get_mmap_base(stream->ustream); + if (!mmap_base) { + ERR("Failed to get mmap base for stream `%s`", + stream->name); + ret = -EPERM; + goto error; + } + + ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset); + if (ret != 0) { + ERR("Failed to get mmap offset for stream `%s`", stream->name); + ret = -EINVAL; + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; + +} + /* * Take a snapshot of all the stream of a channel. * @@ -1076,14 +1151,11 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, DBG("UST consumer snapshot channel %" PRIu64, key); cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - /* Are we at a position _before_ the first available packet ? */ - bool before_first_packet = true; - health_code_update(); /* Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); @@ -1150,7 +1222,8 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; - int lost_packet = 0; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); @@ -1164,15 +1237,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, } DBG("UST consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; - - /* - * Start accounting lost packets only when we - * already have extracted packets (to match the - * content of the final snapshot). - */ - if (!before_first_packet) { - lost_packet = 1; - } + stream->chan->lost_packets++; continue; } @@ -1188,8 +1253,16 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len, NULL); + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, &subbuf_view, padded_len - len, + NULL); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1208,16 +1281,6 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_close_stream; } consumed_pos += stream->max_sb_size; - - /* - * Only account lost packets located between - * succesfully extracted packets (do not account before - * and after since they are not visible in the - * resulting snapshot). - */ - stream->chan->lost_packets += lost_packet; - lost_packet = 0; - before_first_packet = false; } /* Simply close the stream so we can use it on the next snapshot. */ @@ -1354,7 +1417,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { /* Session daemon status message are handled in the following call. */ - ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, msg.u.relayd_sock.relayd_session_id); @@ -1912,29 +1975,13 @@ error_fatal: return -1; } -/* - * Wrapper over the mmap() read offset from ust-ctl library. Since this can be - * compiled out, we isolate it in this library. - */ -int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream, - unsigned long *off) +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) { assert(stream); assert(stream->ustream); - return ustctl_get_mmap_read_offset(stream->ustream, off); -} - -/* - * Wrapper over the mmap() read offset from ust-ctl library. Since this can be - * compiled out, we isolate it in this library. - */ -void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) -{ - assert(stream); - assert(stream->ustream); - - return ustctl_get_mmap_base(stream->ustream); + ustctl_flush_buffer(stream->ustream, producer_active); } /* @@ -2273,10 +2320,10 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, * because we locked the metadata thread. */ ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0); + pthread_mutex_lock(&metadata->lock); if (ret < 0) { goto end; } - pthread_mutex_lock(&metadata->lock); ret = commit_one_metadata_packet(metadata); if (ret <= 0) { @@ -2430,6 +2477,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; assert(stream); assert(stream->ustream); @@ -2496,6 +2545,8 @@ retry: index.offset = htobe64(stream->out_fd_offset); ret = get_index_values(&index, ustream); if (ret < 0) { + err = ustctl_put_subbuf(ustream); + assert(err == 0); goto end; } @@ -2503,6 +2554,8 @@ retry: ret = update_stream_stats(stream); if (ret < 0) { PERROR("kernctl_get_events_discarded"); + err = ustctl_put_subbuf(ustream); + assert(err == 0); goto end; } } else { @@ -2521,14 +2574,25 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + write_index = 0; + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); + ret = lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, &subbuf_view, padding, &index); /* - * The mmap operation should write subbuf_size amount of data when network - * streaming or the full padding (len) size when we are _not_ streaming. + * The mmap operation should write subbuf_size amount of data when + * network streaming or the full padding (len) size when we are _not_ + * streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || + (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer. This is a DBG statement since any unexpected kill or @@ -2542,6 +2606,7 @@ retry: ret, len, subbuf_size); write_index = 0; } +error_put_subbuf: err = ustctl_put_next_subbuf(ustream); assert(err == 0); @@ -2609,7 +2674,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { + if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid, NULL); @@ -2837,7 +2902,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.key = channel->key; DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64, + ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, request.session_id, request.session_id_per_pid, request.uid, request.key);