Add kernel snapshot support
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 86428f0663141fa250905e269d7ceea22c5cb43d..29ef3a4ce426d1db8755796fdafffceed43525b6 100644 (file)
@@ -37,6 +37,7 @@
 #include <common/pipe.h>
 #include <common/relayd/relayd.h>
 #include <common/utils.h>
+#include <common/consumer-stream.h>
 
 #include "kernel-consumer.h"
 
@@ -83,6 +84,321 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+/*
+ * Get the consumerd position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+               unsigned long *pos)
+{
+       int ret;
+       int infd = stream->wait_fd;
+
+       ret = kernctl_snapshot_get_consumed(infd, pos);
+       if (ret != 0) {
+               errno = -ret;
+               perror("kernctl_snapshot_get_consumed");
+       }
+
+       return ret;
+}
+
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static
+int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
+{
+       struct consumer_relayd_sock_pair *relayd;
+       int ret = 0;
+       char *stream_path;
+
+       if (path != NULL) {
+               stream_path = path;
+       } else {
+               stream_path = stream->chan->pathname;
+       }
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_add_stream(&relayd->control_sock,
+                               stream->name, stream_path,
+                               &stream->relayd_stream_id,
+                               stream->chan->tracefile_size,
+                               stream->chan->tracefile_count);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+               uatomic_inc(&relayd->refcount);
+       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
+                               stream->net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+static
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+       struct consumer_relayd_sock_pair *relayd;
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               consumer_stream_relayd_close(stream, relayd);
+       }
+       rcu_read_unlock();
+}
+
+/*
+ * Take a snapshot of all the stream of a channel
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
+               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       unsigned long consumed_pos, produced_pos;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+
+       DBG("Kernel consumer snapshot channel %lu", key);
+
+       rcu_read_lock();
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("No channel found for key %lu", key);
+               ret = -1;
+               goto end;
+       }
+
+       /* Splice is not supported yet for channel snapshot. */
+       if (channel->output != CONSUMER_CHANNEL_MMAP) {
+               ERR("Unsupported output %d", channel->output);
+               ret = -1;
+               goto end;
+       }
+
+       cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
+                       no_monitor_node) {
+               /*
+                * Lock stream because we are about to change its state.
+                */
+               pthread_mutex_lock(&stream->lock);
+
+               stream->net_seq_idx = relayd_id;
+               channel->relayd_id = relayd_id;
+               if (relayd_id != (uint64_t) -1ULL) {
+                       ret = send_relayd_stream(stream, path);
+                       if (ret < 0) {
+                               ERR("sending stream to relayd");
+                               goto end_unlock;
+                       }
+                       DBG("Stream %s sent to the relayd", stream->name);
+               } else {
+                       ret = utils_create_stream_file(path, stream->name,
+                                       stream->chan->tracefile_size, stream->tracefile_count_current,
+                                       stream->uid, stream->gid);
+                       if (ret < 0) {
+                               ERR("utils_create_stream_file");
+                               goto end_unlock;
+                       }
+
+                       stream->out_fd = ret;
+                       stream->tracefile_size_current = 0;
+
+                       DBG("Kernel consumer snapshot stream %s/%s (%lu)", path,
+                                       stream->name, stream->key);
+               }
+
+               ret = kernctl_buffer_flush(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel metadata stream");
+                       goto end_unlock;
+               }
+
+               ret = lttng_kconsumer_take_snapshot(stream);
+               if (ret < 0) {
+                       ERR("Taking kernel snapshot");
+                       goto end_unlock;
+               }
+
+               ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
+               if (ret < 0) {
+                       ERR("Produced kernel snapshot position");
+                       goto end_unlock;
+               }
+
+               ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
+               if (ret < 0) {
+                       ERR("Consumerd kernel snapshot position");
+                       goto end_unlock;
+               }
+
+               if (stream->max_sb_size == 0) {
+                       ret = kernctl_get_max_subbuf_size(stream->wait_fd,
+                                       &stream->max_sb_size);
+                       if (ret < 0) {
+                               ERR("Getting kernel max_sb_size");
+                               goto end_unlock;
+                       }
+               }
+
+               while (consumed_pos < produced_pos) {
+                       ssize_t read_len;
+                       unsigned long len, padded_len;
+
+                       DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
+
+                       ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
+                       if (ret < 0) {
+                               if (errno != EAGAIN) {
+                                       PERROR("kernctl_get_subbuf snapshot");
+                                       goto end_unlock;
+                               }
+                               DBG("Kernel consumer get subbuf failed. Skipping it.");
+                               consumed_pos += stream->max_sb_size;
+                               continue;
+                       }
+
+                       ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
+                       if (ret < 0) {
+                               ERR("Snapshot kernctl_get_subbuf_size");
+                               goto end_unlock;
+                       }
+
+                       ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
+                       if (ret < 0) {
+                               ERR("Snapshot kernctl_get_padded_subbuf_size");
+                               goto end_unlock;
+                       }
+
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+                                       padded_len - len);
+                       /*
+                        * We write the padded len in local tracefiles but the
+                        * data len when using a relay.
+                        * Display the error but continue processing to try to
+                        * release the subbuffer.
+                        */
+                       if (relayd_id != (uint64_t) -1ULL) {
+                               if (read_len != len) {
+                                       ERR("Error sending to the relay (ret: %zd != len: %lu)",
+                                                       read_len, len);
+                               }
+                       } else {
+                               if (read_len != padded_len) {
+                                       ERR("Error writing to tracefile (ret: %zd != len: %lu)",
+                                                       read_len, padded_len);
+                               }
+                       }
+
+                       ret = kernctl_put_subbuf(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Snapshot kernctl_put_subbuf");
+                               goto end_unlock;
+                       }
+                       consumed_pos += stream->max_sb_size;
+               }
+
+               if (relayd_id == (uint64_t) -1ULL) {
+                       ret = close(stream->out_fd);
+                       if (ret < 0) {
+                               PERROR("Kernel consumer snapshot close out_fd");
+                               goto end_unlock;
+                       }
+                       stream->out_fd = -1;
+               } else {
+                       close_relayd_stream(stream);
+                       stream->net_seq_idx = (uint64_t) -1ULL;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       /* All good! */
+       ret = 0;
+       goto end;
+
+end_unlock:
+       pthread_mutex_unlock(&stream->lock);
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Read the whole metadata available for a snapshot.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
+               struct lttng_consumer_local_data *ctx)
+{
+       struct lttng_consumer_channel *metadata_channel;
+       struct lttng_consumer_stream *metadata_stream;
+       int ret;
+
+       DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
+                       key, path);
+
+       rcu_read_lock();
+
+       metadata_channel = consumer_find_channel(key);
+       if (!metadata_channel) {
+               ERR("Snapshot kernel metadata channel not found for key %lu", key);
+               ret = -1;
+               goto end;
+       }
+
+       metadata_stream = metadata_channel->metadata_stream;
+       assert(metadata_stream);
+
+       ret = utils_create_stream_file(path, metadata_stream->name,
+                       metadata_stream->chan->tracefile_size,
+                       metadata_stream->tracefile_count_current,
+                       metadata_stream->uid, metadata_stream->gid);
+       if (ret < 0) {
+               goto end;
+       }
+       metadata_stream->out_fd = ret;
+
+       ret = 0;
+       while (ret >= 0) {
+               ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               if (ret < 0) {
+                       if (ret != -EPERM) {
+                               ERR("Kernel snapshot reading subbuffer");
+                               goto end;
+                       }
+                       /* "ret" is negative at this point so we will exit the loop. */
+                       continue;
+               }
+       }
+
+       ret = 0;
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Receive command from session daemon and process it.
  *
@@ -189,7 +505,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                int fd;
                struct lttng_pipe *stream_pipe;
-               struct consumer_relayd_sock_pair *relayd = NULL;
                struct lttng_consumer_stream *new_stream;
                struct lttng_consumer_channel *channel;
                int alloc_ret = 0;
@@ -272,12 +587,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
+               switch (channel->output) {
+               case CONSUMER_CHANNEL_SPLICE:
+                       new_stream->output = LTTNG_EVENT_SPLICE;
+                       break;
+               case CONSUMER_CHANNEL_MMAP:
+                       new_stream->output = LTTNG_EVENT_MMAP;
+                       break;
+               default:
+                       ERR("Stream output unknown %d", channel->output);
+                       goto end_nosignal;
+               }
 
                /*
                 * We've just assigned the channel to the stream so increment the
-                * refcount right now.
+                * refcount right now. We don't need to increment the refcount for
+                * streams in no monitor because we handle manually the cleanup of
+                * those. It is very important to make sure there is NO prior
+                * consumer_del_stream() calls or else the refcount will be unbalanced.
                 */
-               uatomic_inc(&new_stream->chan->refcount);
+               if (channel->monitor) {
+                       uatomic_inc(&new_stream->chan->refcount);
+               }
 
                /*
                 * The buffer flush is done on the session daemon side for the kernel
@@ -288,24 +619,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
-               /* The stream is not metadata. Get relayd reference if exists. */
-               relayd = consumer_find_relayd(new_stream->net_seq_idx);
-               if (relayd != NULL) {
-                       /* Add stream on the relayd */
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-                       ret = relayd_add_stream(&relayd->control_sock,
-                                       new_stream->name, new_stream->chan->pathname,
-                                       &new_stream->relayd_stream_id,
-                                       new_stream->chan->tracefile_size,
-                                       new_stream->chan->tracefile_count);
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       if (ret < 0) {
-                               consumer_del_stream(new_stream, NULL);
-                               goto end_nosignal;
-                       }
-               } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
-                       ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                                       new_stream->net_seq_idx);
+               ret = send_relayd_stream(new_stream, NULL);
+               if (ret < 0) {
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
@@ -318,11 +633,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               if (new_stream->metadata_flag) {
+                       channel->metadata_stream = new_stream;
+               }
+
                /* Do not monitor this stream. */
                if (!channel->monitor) {
                        DBG("Kernel consumer add stream %s in no monitor mode with"
                                        "relayd id %" PRIu64, new_stream->name,
                                        new_stream->relayd_stream_id);
+                       cds_list_add(&new_stream->no_monitor_node,
+                                       &channel->stream_no_monitor_list.head);
                        break;
                }
 
@@ -411,6 +732,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
+               if (msg.u.snapshot_channel.metadata == 1) {
+                       ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key,
+                                       msg.u.snapshot_channel.pathname, ctx);
+                       if (ret < 0) {
+                               ERR("Snapshot metadata failed");
+                               ret_code = LTTNG_ERR_KERN_META_FAIL;
+                       }
+               } else {
+                       ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id, ctx);
+                       if (ret < 0) {
+                               ERR("Snapshot channel failed");
+                               ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
+                       }
+               }
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -418,6 +756,39 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_DESTROY_CHANNEL:
+       {
+               uint64_t key = msg.u.destroy_channel.key;
+               struct lttng_consumer_channel *channel;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
+                       ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+               }
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               /*
+                * This command should ONLY be issued for channel with streams set in
+                * no monitor mode.
+                */
+               assert(!channel->monitor);
+
+               /*
+                * The refcount should ALWAYS be 0 in the case of a channel in no
+                * monitor mode.
+                */
+               assert(!uatomic_sub_return(&channel->refcount, 1));
+
+               consumer_del_channel(channel);
+
+               goto end_nosignal;
+       }
        default:
                goto end_nosignal;
        }
@@ -474,8 +845,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
        switch (stream->chan->output) {
-       case LTTNG_EVENT_SPLICE:
-
+       case CONSUMER_CHANNEL_SPLICE:
                /*
                 * XXX: The lttng-modules splice "actor" does not handle copying
                 * partial pages hence only using the subbuffer size without the
@@ -500,7 +870,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        ret, subbuf_size);
                }
                break;
-       case LTTNG_EVENT_MMAP:
+       case CONSUMER_CHANNEL_MMAP:
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
                if (err != 0) {
This page took 0.032539 seconds and 5 git commands to generate.