X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=539d80b0b4a32223054a08be4116fda25f7986eb;hp=bdd5ec86d03d926c45b3bebb54e3a1701e4147c8;hb=d9a2e16ee3abce83801f58473831330aa8a5463b;hpb=ddc93ee4c528738932c2369715c0448bfd7ff023 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index bdd5ec86d..539d80b0b 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -60,7 +60,11 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) int infd = stream->wait_fd; ret = kernctl_snapshot(infd); - if (ret != 0) { + /* + * -EAGAIN is not an error, it just means that there is no data to + * be read. + */ + if (ret != 0 && ret != -EAGAIN) { PERROR("Getting sub-buffer snapshot."); } @@ -187,14 +191,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", path, stream->name, stream->key); } - if (relayd_id != -1ULL) { - ret = consumer_send_relayd_streams_sent(relayd_id); - if (ret < 0) { - ERR("sending streams sent to relayd"); - goto end_unlock; - } - channel->streams_sent_to_relayd = true; - } ret = kernctl_buffer_flush_empty(stream->wait_fd); if (ret < 0) { @@ -465,10 +461,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { /* Session daemon status message are handled in the following call. */ - ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -662,6 +658,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->chan = channel; new_stream->wait_fd = fd; + consumer_stream_update_channel_attributes(new_stream, + channel); switch (channel->output) { case CONSUMER_CHANNEL_SPLICE: new_stream->output = LTTNG_EVENT_SPLICE; @@ -748,26 +746,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - ret = consumer_add_metadata_stream(new_stream); - if (ret) { - ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", - new_stream->key); - consumer_stream_free(new_stream); - goto end_nosignal; - } + consumer_add_metadata_stream(new_stream); stream_pipe = ctx->consumer_metadata_pipe; } else { - ret = consumer_add_data_stream(new_stream); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - consumer_stream_free(new_stream); - goto end_nosignal; - } + consumer_add_data_stream(new_stream); stream_pipe = ctx->consumer_data_pipe; } - /* Vitible to other threads */ + /* Visible to other threads */ new_stream->globally_visible = 1; health_code_update(); @@ -983,7 +969,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DISCARDED_EVENTS: { - uint64_t ret; + ssize_t ret; + uint64_t count; struct lttng_consumer_channel *channel; uint64_t id = msg.u.discarded_events.session_id; uint64_t key = msg.u.discarded_events.channel_key; @@ -995,15 +982,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("Kernel consumer discarded events channel %" PRIu64 " not found", key); - ret = 0; + count = 0; } else { - ret = channel->discarded_events; + count = channel->discarded_events; } health_code_update(); /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + ret = lttcomm_send_unix_sock(sock, &count, sizeof(count)); if (ret < 0) { PERROR("send discarded events"); goto error_fatal; @@ -1013,7 +1000,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_LOST_PACKETS: { - uint64_t ret; + ssize_t ret; + uint64_t count; struct lttng_consumer_channel *channel; uint64_t id = msg.u.lost_packets.session_id; uint64_t key = msg.u.lost_packets.channel_key; @@ -1025,15 +1013,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("Kernel consumer lost packets channel %" PRIu64 " not found", key); - ret = 0; + count = 0; } else { - ret = channel->lost_packets; + count = channel->lost_packets; } health_code_update(); /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + ret = lttcomm_send_unix_sock(sock, &count, sizeof(count)); if (ret < 0) { PERROR("send lost packets"); goto error_fatal; @@ -1090,6 +1078,54 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_ROTATE_RENAME: + { + DBG("Consumer rename session %" PRIu64 " after rotation, old path = \"%s\", new path = \"%s\"", + msg.u.rotate_rename.session_id, + msg.u.rotate_rename.old_path, + msg.u.rotate_rename.new_path); + ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path, + msg.u.rotate_rename.new_path, + msg.u.rotate_rename.uid, + msg.u.rotate_rename.gid, + msg.u.rotate_rename.relayd_id); + if (ret < 0) { + ERR("Rotate rename failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_MKDIR: + { + DBG("Consumer mkdir %s in session %" PRIu64, + msg.u.mkdir.path, + msg.u.mkdir.session_id); + ret = lttng_consumer_mkdir(msg.u.mkdir.path, + msg.u.mkdir.uid, + msg.u.mkdir.gid, + msg.u.mkdir.relayd_id); + if (ret < 0) { + ERR("consumer mkdir failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } default: goto end_nosignal; } @@ -1166,7 +1202,6 @@ static int get_index_values(struct ctf_packet_index *index, int infd) if (ret == -ENOTTY) { /* Command not implemented by lttng-modules. */ index->stream_instance_id = -1ULL; - ret = 0; } else { PERROR("kernctl_get_instance_id"); goto error; @@ -1238,7 +1273,6 @@ int update_stream_stats(struct lttng_consumer_stream *stream) if (ret == -ENOTTY) { /* Command not implemented by lttng-modules. */ seq = -1ULL; - ret = 0; } else { PERROR("kernctl_get_sequence_number"); goto end;