Keep read-only copies of fields from the channel to the stream
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index bdd5ec86d03d926c45b3bebb54e3a1701e4147c8..539d80b0b4a32223054a08be4116fda25f7986eb 100644 (file)
@@ -60,7 +60,11 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
        int infd = stream->wait_fd;
 
        ret = kernctl_snapshot(infd);
-       if (ret != 0) {
+       /*
+        * -EAGAIN is not an error, it just means that there is no data to
+        *  be read.
+        */
+       if (ret != 0 && ret != -EAGAIN) {
                PERROR("Getting sub-buffer snapshot.");
        }
 
@@ -187,14 +191,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
                                        path, stream->name, stream->key);
                }
-               if (relayd_id != -1ULL) {
-                       ret = consumer_send_relayd_streams_sent(relayd_id);
-                       if (ret < 0) {
-                               ERR("sending streams sent to relayd");
-                               goto end_unlock;
-                       }
-                       channel->streams_sent_to_relayd = true;
-               }
 
                ret = kernctl_buffer_flush_empty(stream->wait_fd);
                if (ret < 0) {
@@ -465,10 +461,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
                /* Session daemon status message are handled in the following call. */
-               ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+               consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
                                &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
-                                msg.u.relayd_sock.relayd_session_id);
+                               msg.u.relayd_sock.relayd_session_id);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_CHANNEL:
@@ -662,6 +658,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
+               consumer_stream_update_channel_attributes(new_stream,
+                               channel);
                switch (channel->output) {
                case CONSUMER_CHANNEL_SPLICE:
                        new_stream->output = LTTNG_EVENT_SPLICE;
@@ -748,26 +746,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       ret = consumer_add_metadata_stream(new_stream);
-                       if (ret) {
-                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               consumer_stream_free(new_stream);
-                               goto end_nosignal;
-                       }
+                       consumer_add_metadata_stream(new_stream);
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
-                       ret = consumer_add_data_stream(new_stream);
-                       if (ret) {
-                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               consumer_stream_free(new_stream);
-                               goto end_nosignal;
-                       }
+                       consumer_add_data_stream(new_stream);
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
-               /* Vitible to other threads */
+               /* Visible to other threads */
                new_stream->globally_visible = 1;
 
                health_code_update();
@@ -983,7 +969,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DISCARDED_EVENTS:
        {
-               uint64_t ret;
+               ssize_t ret;
+               uint64_t count;
                struct lttng_consumer_channel *channel;
                uint64_t id = msg.u.discarded_events.session_id;
                uint64_t key = msg.u.discarded_events.channel_key;
@@ -995,15 +982,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (!channel) {
                        ERR("Kernel consumer discarded events channel %"
                                        PRIu64 " not found", key);
-                       ret = 0;
+                       count = 0;
                } else {
-                       ret = channel->discarded_events;
+                       count = channel->discarded_events;
                }
 
                health_code_update();
 
                /* Send back returned value to session daemon */
-               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
                if (ret < 0) {
                        PERROR("send discarded events");
                        goto error_fatal;
@@ -1013,7 +1000,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_LOST_PACKETS:
        {
-               uint64_t ret;
+               ssize_t ret;
+               uint64_t count;
                struct lttng_consumer_channel *channel;
                uint64_t id = msg.u.lost_packets.session_id;
                uint64_t key = msg.u.lost_packets.channel_key;
@@ -1025,15 +1013,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (!channel) {
                        ERR("Kernel consumer lost packets channel %"
                                        PRIu64 " not found", key);
-                       ret = 0;
+                       count = 0;
                } else {
-                       ret = channel->lost_packets;
+                       count = channel->lost_packets;
                }
 
                health_code_update();
 
                /* Send back returned value to session daemon */
-               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
                if (ret < 0) {
                        PERROR("send lost packets");
                        goto error_fatal;
@@ -1090,6 +1078,54 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_ROTATE_RENAME:
+       {
+               DBG("Consumer rename session %" PRIu64 " after rotation, old path = \"%s\", new path = \"%s\"",
+                               msg.u.rotate_rename.session_id,
+                               msg.u.rotate_rename.old_path,
+                               msg.u.rotate_rename.new_path);
+               ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path,
+                               msg.u.rotate_rename.new_path,
+                               msg.u.rotate_rename.uid,
+                               msg.u.rotate_rename.gid,
+                               msg.u.rotate_rename.relayd_id);
+               if (ret < 0) {
+                       ERR("Rotate rename failed");
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
+       case LTTNG_CONSUMER_MKDIR:
+       {
+               DBG("Consumer mkdir %s in session %" PRIu64,
+                               msg.u.mkdir.path,
+                               msg.u.mkdir.session_id);
+               ret = lttng_consumer_mkdir(msg.u.mkdir.path,
+                               msg.u.mkdir.uid,
+                               msg.u.mkdir.gid,
+                               msg.u.mkdir.relayd_id);
+               if (ret < 0) {
+                       ERR("consumer mkdir failed");
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
        default:
                goto end_nosignal;
        }
@@ -1166,7 +1202,6 @@ static int get_index_values(struct ctf_packet_index *index, int infd)
                if (ret == -ENOTTY) {
                        /* Command not implemented by lttng-modules. */
                        index->stream_instance_id = -1ULL;
-                       ret = 0;
                } else {
                        PERROR("kernctl_get_instance_id");
                        goto error;
@@ -1238,7 +1273,6 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
                if (ret == -ENOTTY) {
                        /* Command not implemented by lttng-modules. */
                        seq = -1ULL;
-                       ret = 0;
                } else {
                        PERROR("kernctl_get_sequence_number");
                        goto end;
This page took 0.028803 seconds and 5 git commands to generate.