X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=4a2e4e07d7c0b8e5eac8e34927afad5f8762da4a;hb=23d565989350c270c68e9a6c8edfbe2dd6a6895d;hp=5075e05929238ade072391bf5367ae7f40c0433f;hpb=ab5be9fa2eb5ba9600a82cd18fd3cfcbac69169a;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 5075e0592..4a2e4e07d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -7,6 +7,8 @@ * */ +#include "common/buffer-view.h" +#include #define _LGPL_SOURCE #include #include @@ -113,6 +115,25 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base = stream->mmap_base; + + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret < 0) { + PERROR("Failed to get mmap read offset"); + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; +} + /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of @@ -228,9 +249,10 @@ static int lttng_kconsumer_snapshot_channel( while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; unsigned long len, padded_len; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); - DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); @@ -257,7 +279,15 @@ static int lttng_kconsumer_snapshot_channel( goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, &subbuf_view, padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len @@ -483,6 +513,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.tracefile_count, 0, msg.u.channel.monitor, msg.u.channel.live_timer_interval, + msg.u.channel.is_live, NULL, NULL); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); @@ -624,7 +655,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); pthread_mutex_lock(&channel->lock); - new_stream = consumer_allocate_stream(channel->key, + new_stream = consumer_allocate_stream( + channel, + channel->key, fd, channel->name, channel->relayd_id, @@ -646,7 +679,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto error_add_stream_nosignal; } - new_stream->chan = channel; new_stream->wait_fd = fd; ret = kernctl_get_max_subbuf_size(new_stream->wait_fd, &new_stream->max_sb_size); @@ -1558,26 +1590,14 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; struct ctf_packet_index index = {}; + bool in_error_state = false; DBG("In read_subbuffer (infd : %d)", infd); - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); @@ -1691,6 +1711,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } break; case CONSUMER_CHANNEL_MMAP: + { + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; + /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { @@ -1710,18 +1734,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto error; } + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + /* Make sure the tracer is not gone mad on us! */ assert(len >= subbuf_size); padding = len - subbuf_size; + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, - padding, &index); + ret = lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, &subbuf_view, padding, &index); /* - * The mmap operation should write subbuf_size amount of data when - * network streaming or the full padding (len) size when we are _not_ - * streaming. + * The mmap operation should write subbuf_size amount of data + * when network streaming or the full padding (len) size when we + * are _not_ streaming. */ if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { @@ -1736,11 +1767,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, write_index = 0; } break; + } default: ERR("Unknown output method"); ret = -EPERM; } - +error_put_subbuf: err = kernctl_put_next_subbuf(infd); if (err != 0) { if (err == -EFAULT) { @@ -1751,11 +1783,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } ret = err; goto error; + } else if (in_error_state) { + goto error; } /* Write index if needed. */ if (!write_index) { - goto rotate; + goto end; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -1788,25 +1822,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto error; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; - } - +end: error: return ret; }