X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=b8695698da7301b69b114fe373680b248294b486;hp=221c348df055722918793e32617b6812dcfbc680;hb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;hpb=56591bac20c0f3b728c95d92702d243de838bdc4 diff --git a/src/common/consumer.c b/src/common/consumer.c index 221c348df..b8695698d 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -33,12 +33,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include "consumer.h" #include "consumer-stream.h" @@ -304,6 +306,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) consumer_stream_destroy(stream, NULL); } + if (channel->live_timer_enabled == 1) { + consumer_timer_live_stop(channel); + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; @@ -498,6 +504,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->key = stream_key; stream->out_fd = -1; stream->out_fd_offset = 0; + stream->output_written = 0; stream->state = state; stream->uid = uid; stream->gid = gid; @@ -505,6 +512,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; + stream->index_fd = -1; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -726,6 +734,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, if (ret < 0) { goto end; } + uatomic_inc(&relayd->refcount); stream->sent_to_relayd = 1; } else { @@ -836,7 +845,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, - unsigned int monitor) + unsigned int monitor, + unsigned int live_timer_interval) { struct lttng_consumer_channel *channel; @@ -857,6 +867,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + channel->live_timer_interval = live_timer_interval; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); @@ -1316,7 +1327,8 @@ end: ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding) + unsigned long padding, + struct lttng_packet_index *index) { unsigned long mmap_offset; void *mmap_base; @@ -1423,16 +1435,34 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current)); + stream->out_fd, &(stream->tracefile_count_current), + &stream->out_fd); if (ret < 0) { ERR("Rotating output file"); goto end; } - outfd = stream->out_fd = ret; + outfd = stream->out_fd; + + if (stream->index_fd >= 0) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end; + } + stream->index_fd = ret; + } + /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; + if (index) { + index->offset = htobe64(stream->out_fd_offset); + } } while (len > 0) { @@ -1473,6 +1503,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret; } + stream->output_written += ret; written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -1506,7 +1537,8 @@ end: ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding) + unsigned long padding, + struct lttng_packet_index *index) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1606,16 +1638,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current)); + stream->out_fd, &(stream->tracefile_count_current), + &stream->out_fd); if (ret < 0) { ERR("Rotating output file"); goto end; } - outfd = stream->out_fd = ret; + outfd = stream->out_fd; + + if (stream->index_fd >= 0) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end; + } + stream->index_fd = ret; + } + /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; + index->offset = htobe64(stream->out_fd_offset); } while (len > 0) { @@ -1684,6 +1732,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret_splice; } + stream->output_written += ret_splice; written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -2190,11 +2239,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ - if (!revents) { - continue; - } - if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -3073,7 +3117,8 @@ void lttng_consumer_init(void) int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id) + struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, + uint64_t relayd_session_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; @@ -3173,29 +3218,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; - /* - * Create a session on the relayd and store the returned id. Lock the - * control socket mutex if the relayd was NOT created before. - */ - if (!relayd_created) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - } - ret = relayd_create_session(&relayd->control_sock, - &relayd->relayd_session_id); - if (!relayd_created) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - if (ret < 0) { - /* - * Close all sockets of a relayd object. It will be freed if it was - * created at the error code path or else it will be garbage - * collect. - */ - (void) relayd_close(&relayd->control_sock); - (void) relayd_close(&relayd->data_sock); - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - goto error; - } + relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: @@ -3396,6 +3419,15 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { + /* + * An empty output file is not valid. We need at least one packet + * generated per stream, even if it contains no event, so it + * contains at least one packet header. + */ + if (stream->output_written == 0) { + pthread_mutex_unlock(&stream->lock); + goto data_pending; + } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) {