X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=1b01d414a0248b0207bbf72aba620a8fa62b93e4;hp=09ccda329526b69cf1e5aa147ed32f4dc8a67e81;hb=0c759fc95033a3d6d7cb939f39dd643ce7e127ee;hpb=309167d2a6f59d0c8cbf64eb23ba912cdea76a34 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 09ccda329..1b01d414a 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -138,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + + health_code_update(); + /* * Lock stream because we are about to change its state. */ @@ -220,6 +225,8 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ssize_t read_len; unsigned long len, padded_len; + health_code_update(); + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); @@ -361,6 +368,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, } do { + health_code_update(); + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); if (ret_read < 0) { if (ret_read != -EAGAIN) { @@ -409,9 +418,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; - enum lttng_error_code ret_code = LTTNG_OK; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; + health_code_update(); + ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { if (ret > 0) { @@ -420,6 +431,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } return ret; } + + health_code_update(); + if (msg.cmd_type == LTTNG_CONSUMER_STOP) { /* * Notify the session daemon that the command is completed. @@ -432,6 +446,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return -ENOENT; } + health_code_update(); + /* relayd needs RCU read-side protection */ rcu_read_lock(); @@ -441,7 +457,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -449,12 +466,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *new_channel; int ret_recv; + health_code_update(); + /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ goto error_fatal; } + + health_code_update(); + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, @@ -462,7 +484,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, msg.u.channel.tracefile_count, 0, - msg.u.channel.monitor); + msg.u.channel.monitor, + msg.u.channel.live_timer_interval); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -491,6 +514,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; }; + health_code_update(); + if (ctx->on_recv_channel != NULL) { ret_recv = ctx->on_recv_channel(new_channel); if (ret_recv == 0) { @@ -501,6 +526,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } + if (CONSUMER_CHANNEL_TYPE_DATA) { + consumer_timer_live_start(new_channel, + msg.u.channel.live_timer_interval); + } + + health_code_update(); /* If we received an error in add_channel, we need to report it. */ if (ret < 0) { @@ -535,23 +566,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; } + health_code_update(); + /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ goto error_fatal; } - if (ret_code != LTTNG_OK) { + + health_code_update(); + + if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Channel was not found. */ goto end_nosignal; } /* Blocking call */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + health_poll_entry(); + ret = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret < 0) { rcu_read_unlock(); return -EINTR; } + health_code_update(); + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { @@ -560,6 +601,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + health_code_update(); + /* * Send status code to session daemon only if the recv works. If the * above recv() failed, the session daemon is notified through the @@ -571,6 +614,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + health_code_update(); + new_stream = consumer_allocate_stream(channel->key, fd, LTTNG_CONSUMER_ACTIVE_STREAM, @@ -628,6 +673,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ new_stream->hangup_flush_done = 0; + health_code_update(); + if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); if (ret < 0) { @@ -636,6 +683,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } + health_code_update(); + if (new_stream->metadata_flag) { channel->metadata_stream = new_stream; } @@ -683,6 +732,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Vitible to other threads */ new_stream->globally_visible = 1; + health_code_update(); + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", @@ -733,6 +784,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_flag_relayd_for_destroy(relayd); } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -750,6 +803,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_data_pending(id); + health_code_update(); + /* Send back returned value to session daemon */ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { @@ -785,6 +840,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -803,12 +860,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } + health_code_update(); + /* * This command should ONLY be issued for channel with streams set in * no monitor mode. @@ -836,6 +897,7 @@ end_nosignal: * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ + health_code_update(); return 1; error_fatal: @@ -898,6 +960,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd) error: return ret; } +/* + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + * + * Return 0 if new metadatda is available, EAGAIN if the metadata stream + * is empty or a negative value on error. + */ +int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) +{ + int ret; + + assert(metadata); + + ret = kernctl_buffer_flush(metadata->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + + ret = kernctl_snapshot(metadata->wait_fd); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Sync metadata, taking kernel snapshot failed."); + goto end; + } + DBG("Sync metadata, no new kernel metadata"); + /* No new metadata, exit. */ + ret = ENODATA; + goto end; + } + +end: + return ret; +} /* * Consume data on a file descriptor and write it on a trace file. @@ -906,18 +1004,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 0; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; struct lttng_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); - /* Indicate that for this stream we have to write the index. */ - if (stream->index_fd >= 0) { - write_index = 1; - } - /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -941,11 +1034,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } - if (!stream->metadata_flag && write_index) { + if (!stream->metadata_flag) { ret = get_index_values(&index, infd); if (ret < 0) { goto end; } + } else { + write_index = 0; } switch (stream->chan->output) { @@ -1027,14 +1122,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } /* Write index if needed. */ - if (write_index) { - err = index_write(stream->index_fd, &index, sizeof(index)); + if (!write_index) { + goto end; + } + + if (stream->chan->live_timer_interval && !stream->metadata_flag) { + /* + * In live, block until all the metadata is sent. + */ + err = consumer_stream_sync_metadata(ctx, stream->session_id); if (err < 0) { - ret = -1; goto end; } } + err = consumer_stream_write_index(stream, &index); + if (err < 0) { + goto end; + } + end: return ret; }