X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=1b01d414a0248b0207bbf72aba620a8fa62b93e4;hp=67f60d816c0de6c31407840f5f2b2b160e049f1b;hb=0c759fc95033a3d6d7cb939f39dd643ce7e127ee;hpb=194ee0773b191910be3da7e8bb8b340d1edbba71 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 67f60d816..1b01d414a 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -38,6 +39,8 @@ #include #include #include +#include +#include #include "kernel-consumer.h" @@ -57,8 +60,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) ret = kernctl_snapshot(infd); if (ret != 0) { - errno = -ret; perror("Getting sub-buffer snapshot."); + ret = -errno; } return ret; @@ -77,8 +80,8 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_produced(infd, pos); if (ret != 0) { - errno = -ret; perror("kernctl_snapshot_get_produced"); + ret = -errno; } return ret; @@ -97,8 +100,8 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_consumed(infd, pos); if (ret != 0) { - errno = -ret; perror("kernctl_snapshot_get_consumed"); + ret = -errno; } return ret; @@ -110,20 +113,21 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * Returns 0 on success, < 0 on error */ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, - uint64_t relayd_id, struct lttng_consumer_local_data *ctx) + uint64_t relayd_id, uint64_t max_stream_size, + struct lttng_consumer_local_data *ctx) { int ret; unsigned long consumed_pos, produced_pos; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; - DBG("Kernel consumer snapshot channel %lu", key); + DBG("Kernel consumer snapshot channel %" PRIu64, key); rcu_read_lock(); channel = consumer_find_channel(key); if (!channel) { - ERR("No channel found for key %lu", key); + ERR("No channel found for key %" PRIu64, key); ret = -1; goto end; } @@ -136,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + + health_code_update(); + /* * Lock stream because we are about to change its state. */ @@ -157,7 +164,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = utils_create_stream_file(path, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { ERR("utils_create_stream_file"); goto end_unlock; @@ -166,13 +173,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, stream->out_fd = ret; stream->tracefile_size_current = 0; - DBG("Kernel consumer snapshot stream %s/%s (%lu)", path, - stream->name, stream->key); + DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", + path, stream->name, stream->key); } ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); + ret = -errno; goto end_unlock; } @@ -199,20 +207,33 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, &stream->max_sb_size); if (ret < 0) { ERR("Getting kernel max_sb_size"); + ret = -errno; goto end_unlock; } } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; + health_code_update(); + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); if (ret < 0) { if (errno != EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); + ret = -errno; goto end_unlock; } DBG("Kernel consumer get subbuf failed. Skipping it."); @@ -223,17 +244,19 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_get_subbuf_size(stream->wait_fd, &len); if (ret < 0) { ERR("Snapshot kernctl_get_subbuf_size"); + ret = -errno; goto error_put_subbuf; } ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); if (ret < 0) { ERR("Snapshot kernctl_get_padded_subbuf_size"); + ret = -errno; goto error_put_subbuf; } read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len); + padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -254,18 +277,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { ERR("Snapshot kernctl_put_subbuf"); + ret = -errno; goto end_unlock; } consumed_pos += stream->max_sb_size; } if (relayd_id == (uint64_t) -1ULL) { - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end_unlock; + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; } - stream->out_fd = -1; } else { close_relayd_stream(stream); stream->net_seq_idx = (uint64_t) -1ULL; @@ -280,6 +306,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, error_put_subbuf: ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { + ret = -errno; ERR("Snapshot kernctl_put_subbuf error path"); } end_unlock: @@ -333,7 +360,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, ret = utils_create_stream_file(path, metadata_stream->name, metadata_stream->chan->tracefile_size, metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid); + metadata_stream->uid, metadata_stream->gid, NULL); if (ret < 0) { goto error; } @@ -341,10 +368,12 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, } do { + health_code_update(); + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); if (ret_read < 0) { - if (ret_read != -EPERM) { - ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)", + if (ret_read != -EAGAIN) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read); goto error; } @@ -357,15 +386,17 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, close_relayd_stream(metadata_stream); metadata_stream->net_seq_idx = (uint64_t) -1ULL; } else { - ret = close(metadata_stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot metadata close out_fd"); - /* - * Don't go on error here since the snapshot was successful at this - * point but somehow the close failed. - */ + if (metadata_stream->out_fd >= 0) { + ret = close(metadata_stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ + } + metadata_stream->out_fd = -1; } - metadata_stream->out_fd = -1; } ret = 0; @@ -387,17 +418,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; - enum lttng_error_code ret_code = LTTNG_OK; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; + health_code_update(); + ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret = -1; } return ret; } + + health_code_update(); + if (msg.cmd_type == LTTNG_CONSUMER_STOP) { /* * Notify the session daemon that the command is completed. @@ -410,6 +446,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return -ENOENT; } + health_code_update(); + /* relayd needs RCU read-side protection */ rcu_read_lock(); @@ -419,7 +457,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -427,12 +466,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *new_channel; int ret_recv; + health_code_update(); + /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ goto error_fatal; } + + health_code_update(); + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, @@ -440,7 +484,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, msg.u.channel.tracefile_count, 0, - msg.u.channel.monitor); + msg.u.channel.monitor, + msg.u.channel.live_timer_interval); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -469,6 +514,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; }; + health_code_update(); + if (ctx->on_recv_channel != NULL) { ret_recv = ctx->on_recv_channel(new_channel); if (ret_recv == 0) { @@ -479,6 +526,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } + if (CONSUMER_CHANNEL_TYPE_DATA) { + consumer_timer_live_start(new_channel, + msg.u.channel.live_timer_interval); + } + + health_code_update(); /* If we received an error in add_channel, we need to report it. */ if (ret < 0) { @@ -513,23 +566,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; } + health_code_update(); + /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ goto error_fatal; } - if (ret_code != LTTNG_OK) { + + health_code_update(); + + if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Channel was not found. */ goto end_nosignal; } /* Blocking call */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + health_poll_entry(); + ret = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret < 0) { rcu_read_unlock(); return -EINTR; } + health_code_update(); + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { @@ -538,6 +601,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + health_code_update(); + /* * Send status code to session daemon only if the recv works. If the * above recv() failed, the session daemon is notified through the @@ -549,6 +614,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + health_code_update(); + new_stream = consumer_allocate_stream(channel->key, fd, LTTNG_CONSUMER_ACTIVE_STREAM, @@ -606,6 +673,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ new_stream->hangup_flush_done = 0; + health_code_update(); + if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); if (ret < 0) { @@ -614,6 +683,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } + health_code_update(); + if (new_stream->metadata_flag) { channel->metadata_stream = new_stream; } @@ -639,17 +710,40 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_data_pipe; } + /* Vitible to other threads */ + new_stream->globally_visible = 1; + + health_code_update(); + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_stream_free(new_stream); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } @@ -690,6 +784,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_flag_relayd_for_destroy(relayd); } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -707,6 +803,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_data_pending(id); + health_code_update(); + /* Send back returned value to session daemon */ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { @@ -733,13 +831,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, ctx); + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, + ctx); if (ret < 0) { ERR("Snapshot channel failed"); ret_code = LTTNG_ERR_KERN_CHAN_FAIL; } } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -758,12 +860,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } + health_code_update(); + /* * This command should ONLY be issued for channel with streams set in * no monitor mode. @@ -791,6 +897,7 @@ end_nosignal: * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ + health_code_update(); return 1; error_fatal: @@ -799,6 +906,97 @@ error_fatal: return -1; } +/* + * Populate index values of a kernel stream. Values are set in big endian order. + * + * Return 0 on success or else a negative value. + */ +static int get_index_values(struct lttng_packet_index *index, int infd) +{ + int ret; + + ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin); + if (ret < 0) { + PERROR("kernctl_get_timestamp_begin"); + goto error; + } + index->timestamp_begin = htobe64(index->timestamp_begin); + + ret = kernctl_get_timestamp_end(infd, &index->timestamp_end); + if (ret < 0) { + PERROR("kernctl_get_timestamp_end"); + goto error; + } + index->timestamp_end = htobe64(index->timestamp_end); + + ret = kernctl_get_events_discarded(infd, &index->events_discarded); + if (ret < 0) { + PERROR("kernctl_get_events_discarded"); + goto error; + } + index->events_discarded = htobe64(index->events_discarded); + + ret = kernctl_get_content_size(infd, &index->content_size); + if (ret < 0) { + PERROR("kernctl_get_content_size"); + goto error; + } + index->content_size = htobe64(index->content_size); + + ret = kernctl_get_packet_size(infd, &index->packet_size); + if (ret < 0) { + PERROR("kernctl_get_packet_size"); + goto error; + } + index->packet_size = htobe64(index->packet_size); + + ret = kernctl_get_stream_id(infd, &index->stream_id); + if (ret < 0) { + PERROR("kernctl_get_stream_id"); + goto error; + } + index->stream_id = htobe64(index->stream_id); + +error: + return ret; +} +/* + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + * + * Return 0 if new metadatda is available, EAGAIN if the metadata stream + * is empty or a negative value on error. + */ +int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) +{ + int ret; + + assert(metadata); + + ret = kernctl_buffer_flush(metadata->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + + ret = kernctl_snapshot(metadata->wait_fd); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Sync metadata, taking kernel snapshot failed."); + goto end; + } + DBG("Sync metadata, no new kernel metadata"); + /* No new metadata, exit. */ + ret = ENODATA; + goto end; + } + +end: + return ret; +} + /* * Consume data on a file descriptor and write it on a trace file. */ @@ -806,15 +1004,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; + struct lttng_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { - ret = err; /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -823,18 +1022,27 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); + ret = -errno; goto end; } /* Get the full subbuffer size including padding */ err = kernctl_get_padded_subbuf_size(infd, &len); if (err != 0) { - errno = -err; perror("Getting sub-buffer len failed."); - ret = err; + ret = -errno; goto end; } + if (!stream->metadata_flag) { + ret = get_index_values(&index, infd); + if (ret < 0) { + goto end; + } + } else { + write_index = 0; + } + switch (stream->chan->output) { case CONSUMER_CHANNEL_SPLICE: /* @@ -847,7 +1055,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* splice the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding); + padding, &index); /* * XXX: Splice does not support network streaming so the return value * is simply checked against subbuf_size and not like the mmap() op. @@ -859,15 +1067,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ ERR("Error splicing to tracefile (ret: %zd != len: %lu)", ret, subbuf_size); + write_index = 0; } break; case CONSUMER_CHANNEL_MMAP: /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { - errno = -err; perror("Getting sub-buffer len failed."); - ret = err; + ret = -errno; goto end; } @@ -878,7 +1086,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, - padding); + padding, &index); /* * The mmap operation should write subbuf_size amount of data when * network streaming or the full padding (len) size when we are _not_ @@ -893,24 +1101,43 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Error writing to tracefile " "(ret: %zd != len: %lu != subbuf_size: %lu)", ret, len, subbuf_size); + write_index = 0; } break; default: ERR("Unknown output method"); - ret = -1; + ret = -EPERM; } err = kernctl_put_next_subbuf(infd); if (err != 0) { - errno = -err; if (errno == EFAULT) { perror("Error in unreserving sub buffer\n"); } else if (errno == EIO) { /* Should never happen with newer LTTng versions */ perror("Reader has been pushed by the writer, last sub-buffer corrupted."); } + ret = -errno; + goto end; + } + + /* Write index if needed. */ + if (!write_index) { + goto end; + } + + if (stream->chan->live_timer_interval && !stream->metadata_flag) { + /* + * In live, block until all the metadata is sent. + */ + err = consumer_stream_sync_metadata(ctx, stream->session_id); + if (err < 0) { + goto end; + } + } - ret = -err; + err = consumer_stream_write_index(stream, &index); + if (err < 0) { goto end; } @@ -931,12 +1158,23 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { goto error; } stream->out_fd = ret; stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto error; + } + stream->index_fd = ret; + } } if (stream->output == LTTNG_EVENT_MMAP) { @@ -945,8 +1183,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { - errno = -ret; PERROR("kernctl_get_mmap_len"); + ret = -errno; goto error_close_fd; } stream->mmap_len = (size_t) mmap_len; @@ -989,6 +1227,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */