Fix: sessiond: snapshot: handle consumer return codes
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index 5e723e9b3804e08335d3e16758486899ec79f83e..a179eeffd1f72f5cde0d1fcb013e70ae2ac4b7e2 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
+ *               2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -561,7 +562,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
        }
        output->enabled = obj->enabled;
        output->net_seq_index = obj->net_seq_index;
-       memcpy(output->subdir, obj->subdir, PATH_MAX);
+       memcpy(output->subdir, obj->subdir, sizeof(output->subdir));
        output->snapshot = obj->snapshot;
        output->relay_major_version = obj->relay_major_version;
        output->relay_minor_version = obj->relay_minor_version;
@@ -715,11 +716,12 @@ int consumer_set_network_uri(struct consumer_output *obj,
                        goto error;
                }
 
-               if (lttng_strncpy(obj->subdir, tmp_path, sizeof(obj->subdir))) {
+               if (lttng_strncpy(obj->dst.net.base_dir, tmp_path,
+                               sizeof(obj->dst.net.base_dir))) {
                        ret = -LTTNG_ERR_INVALID;
                        goto error;
                }
-               DBG3("Consumer set network uri subdir path %s", tmp_path);
+               DBG3("Consumer set network uri base_dir path %s", tmp_path);
        }
 
        return 0;
@@ -731,8 +733,11 @@ error:
 
 /*
  * Send file descriptor to consumer via sock.
+ *
+ * The consumer socket lock must be held by the caller.
  */
-int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, const int *fds,
+               size_t nb_fd)
 {
        int ret;
 
@@ -755,6 +760,8 @@ error:
 
 /*
  * Consumer send communication message structure to consumer.
+ *
+ * The consumer socket lock must be held by the caller.
  */
 int consumer_send_msg(struct consumer_socket *sock,
                struct lttcomm_consumer_msg *msg)
@@ -778,6 +785,8 @@ error:
 
 /*
  * Consumer send channel communication message structure to consumer.
+ *
+ * The consumer socket lock must be held by the caller.
  */
 int consumer_send_channel(struct consumer_socket *sock,
                struct lttcomm_consumer_msg *msg)
@@ -826,7 +835,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                uint32_t ust_app_uid,
                int64_t blocking_timeout,
                const char *root_shm_path,
-               const char *shm_path)
+               const char *shm_path,
+               uint64_t trace_archive_id)
 {
        assert(msg);
 
@@ -855,6 +865,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.ask_channel.monitor = monitor;
        msg->u.ask_channel.ust_app_uid = ust_app_uid;
        msg->u.ask_channel.blocking_timeout = blocking_timeout;
+       msg->u.ask_channel.trace_archive_id = trace_archive_id;
 
        memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
 
@@ -882,8 +893,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Init channel communication message structure.
  */
-void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
-               enum lttng_consumer_command cmd,
+void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                uint64_t channel_key,
                uint64_t session_id,
                const char *pathname,
@@ -906,7 +916,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
 
        /* Send channel */
-       msg->cmd_type = cmd;
+       msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
        msg->u.channel.channel_key = channel_key;
        msg->u.channel.session_id = session_id;
        msg->u.channel.uid = uid;
@@ -932,20 +942,21 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Init stream communication message structure.
  */
-void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
-               enum lttng_consumer_command cmd,
+void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                uint64_t channel_key,
                uint64_t stream_key,
-               int cpu)
+               int32_t cpu,
+               uint64_t trace_archive_id)
 {
        assert(msg);
 
        memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
 
-       msg->cmd_type = cmd;
+       msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM;
        msg->u.stream.channel_key = channel_key;
        msg->u.stream.stream_key = stream_key;
        msg->u.stream.cpu = cpu;
+       msg->u.stream.trace_archive_id = trace_archive_id;
 }
 
 void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
@@ -966,7 +977,7 @@ void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
  */
 int consumer_send_stream(struct consumer_socket *sock,
                struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
-               int *fds, size_t nb_fd)
+               const int *fds, size_t nb_fd)
 {
        int ret;
 
@@ -1057,35 +1068,59 @@ error:
        return ret;
 }
 
-int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
-               int pipe)
+static
+int consumer_send_pipe(struct consumer_socket *consumer_sock,
+               enum lttng_consumer_command cmd, int pipe)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
+       const char *pipe_name;
+       const char *command_name;
+
+       switch (cmd) {
+       case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+               pipe_name = "channel monitor";
+               command_name = "SET_CHANNEL_MONITOR_PIPE";
+               break;
+       default:
+               ERR("Unexpected command received in %s (cmd = %d)", __func__,
+                               (int) cmd);
+               abort();
+       }
 
        /* Code flow error. Safety net. */
 
        memset(&msg, 0, sizeof(msg));
-       msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE;
+       msg.cmd_type = cmd;
 
-       DBG3("Sending set_channel_monitor_pipe command to consumer");
+       pthread_mutex_lock(consumer_sock->lock);
+       DBG3("Sending %s command to consumer", command_name);
        ret = consumer_send_msg(consumer_sock, &msg);
        if (ret < 0) {
                goto error;
        }
 
-       DBG3("Sending channel monitoring pipe %d to consumer on socket %d",
+       DBG3("Sending %s pipe %d to consumer on socket %d",
+                       pipe_name,
                        pipe, *consumer_sock->fd_ptr);
        ret = consumer_send_fds(consumer_sock, &pipe, 1);
        if (ret < 0) {
                goto error;
        }
 
-       DBG2("Channel monitoring pipe successfully sent");
+       DBG2("%s pipe successfully sent", pipe_name);
 error:
+       pthread_mutex_unlock(consumer_sock->lock);
        return ret;
 }
 
+int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
+               int pipe)
+{
+       return consumer_send_pipe(consumer_sock,
+                       LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
+}
+
 /*
  * Set consumer subdirectory using the session name and a generated datetime if
  * needed. This is appended to the current subdirectory.
@@ -1387,7 +1422,8 @@ end:
  */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, uint64_t nb_packets_per_stream)
+               const char *session_path, int wait, uint64_t nb_packets_per_stream,
+               uint64_t trace_archive_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1403,29 +1439,54 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        msg.u.snapshot_channel.key = key;
        msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
        msg.u.snapshot_channel.metadata = metadata;
+       msg.u.snapshot_channel.trace_archive_id = trace_archive_id;
 
        if (output->consumer->type == CONSUMER_DST_NET) {
                msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index;
                msg.u.snapshot_channel.use_relayd = 1;
                ret = snprintf(msg.u.snapshot_channel.pathname,
                                sizeof(msg.u.snapshot_channel.pathname),
-                               "%s/%s-%s-%" PRIu64 "%s", output->consumer->subdir,
-                               output->name, output->datetime, output->nb_snapshot,
+                               "%s/%s/%s-%s-%" PRIu64 "%s",
+                               output->consumer->dst.net.base_dir,
+                               output->consumer->subdir,
+                               output->name, output->datetime,
+                               output->nb_snapshot,
                                session_path);
                if (ret < 0) {
                        ret = -LTTNG_ERR_NOMEM;
                        goto error;
+               } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) {
+                       ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s/%s-%s-%" PRIu64 "%s\"",
+                                       sizeof(msg.u.snapshot_channel.pathname),
+                                       ret, output->consumer->dst.net.base_dir,
+                                       output->consumer->subdir,
+                                       output->name, output->datetime,
+                                       output->nb_snapshot,
+                                       session_path);
+                       ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+                       goto error;
                }
        } else {
                ret = snprintf(msg.u.snapshot_channel.pathname,
                                sizeof(msg.u.snapshot_channel.pathname),
-                               "%s/%s-%s-%" PRIu64 "%s", output->consumer->dst.trace_path,
-                               output->name, output->datetime, output->nb_snapshot,
+                               "%s/%s-%s-%" PRIu64 "%s",
+                               output->consumer->dst.session_root_path,
+                               output->name, output->datetime,
+                               output->nb_snapshot,
                                session_path);
                if (ret < 0) {
                        ret = -LTTNG_ERR_NOMEM;
                        goto error;
+               } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) {
+                       ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s-%s-%" PRIu64 "%s\"",
+                                       sizeof(msg.u.snapshot_channel.pathname),
+                                       ret, output->consumer->dst.session_root_path,
+                                       output->name, output->datetime, output->nb_snapshot,
+                                       session_path);
+                       ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+                       goto error;
                }
+
                msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
 
                /* Create directory. Ignore if exist. */
@@ -1440,8 +1501,18 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        }
 
        health_code_update();
+       pthread_mutex_lock(socket->lock);
        ret = consumer_send_msg(socket, &msg);
+       pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
+               switch (-ret) {
+               case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
+                       ret = -LTTNG_ERR_CHAN_NOT_FOUND;
+                       break;
+               default:
+                       ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+                       break;
+               }
                goto error;
        }
 
@@ -1563,3 +1634,266 @@ end:
        rcu_read_unlock();
        return ret;
 }
+
+/*
+ * Ask the consumer to rotate a channel.
+ * domain_path contains "/kernel" for kernel or the complete path for UST
+ * (ex: /ust/uid/1000/64-bit);
+ *
+ * The new_chunk_id is the session->rotate_count that has been incremented
+ * when the rotation started. On the relay, this allows to keep track in which
+ * chunk each stream is currently writing to (for the rotate_pending operation).
+ */
+int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
+               uid_t uid, gid_t gid, struct consumer_output *output,
+               char *domain_path, bool is_metadata_channel,
+               uint64_t new_chunk_id)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+
+       DBG("Consumer rotate channel key %" PRIu64, key);
+
+       pthread_mutex_lock(socket->lock);
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
+       msg.u.rotate_channel.key = key;
+       msg.u.rotate_channel.metadata = !!is_metadata_channel;
+       msg.u.rotate_channel.new_chunk_id = new_chunk_id;
+
+       if (output->type == CONSUMER_DST_NET) {
+               msg.u.rotate_channel.relayd_id = output->net_seq_index;
+               ret = snprintf(msg.u.rotate_channel.pathname,
+                               sizeof(msg.u.rotate_channel.pathname), "%s%s%s",
+                               output->dst.net.base_dir,
+                               output->chunk_path, domain_path);
+               if (ret < 0 || ret == sizeof(msg.u.rotate_channel.pathname)) {
+                       ERR("Failed to format channel path name when asking consumer to rotate channel");
+                       ret = -LTTNG_ERR_INVALID;
+                       goto error;
+               }
+       } else {
+               msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
+               ret = snprintf(msg.u.rotate_channel.pathname,
+                               sizeof(msg.u.rotate_channel.pathname), "%s%s%s",
+                               output->dst.session_root_path,
+                               output->chunk_path, domain_path);
+               if (ret < 0 || ret == sizeof(msg.u.rotate_channel.pathname)) {
+                       ERR("Failed to format channel path name when asking consumer to rotate channel");
+                       ret = -LTTNG_ERR_INVALID;
+                       goto error;
+               }
+       }
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               switch (-ret) {
+               case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
+                       ret = -LTTNG_ERR_CHAN_NOT_FOUND;
+                       break;
+               default:
+                       ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER;
+                       break;
+               }
+               goto error;
+       }
+error:
+       pthread_mutex_unlock(socket->lock);
+       health_code_update();
+       return ret;
+}
+
+int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
+               const struct consumer_output *output, const char *old_path,
+               const char *new_path, uid_t uid, gid_t gid)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+       size_t old_path_length, new_path_length;
+
+       assert(socket);
+       assert(old_path);
+       assert(new_path);
+
+       DBG("Consumer rotate rename session %" PRIu64 ", old path = \"%s\", new_path = \"%s\"",
+                       session_id, old_path, new_path);
+
+       old_path_length = strlen(old_path);
+       if (old_path_length >= sizeof(msg.u.rotate_rename.old_path)) {
+               ERR("consumer_rotate_rename: old path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)",
+                               old_path_length + 1, sizeof(msg.u.rotate_rename.old_path));
+               ret = -LTTNG_ERR_INVALID;
+               goto error;
+       }
+
+       new_path_length = strlen(new_path);
+       if (new_path_length >= sizeof(msg.u.rotate_rename.new_path)) {
+               ERR("consumer_rotate_rename: new path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)",
+                               new_path_length + 1, sizeof(msg.u.rotate_rename.new_path));
+               ret = -LTTNG_ERR_INVALID;
+               goto error;
+       }
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME;
+       msg.u.rotate_rename.session_id = session_id;
+       msg.u.rotate_rename.uid = uid;
+       msg.u.rotate_rename.gid = gid;
+       strcpy(msg.u.rotate_rename.old_path, old_path);
+       strcpy(msg.u.rotate_rename.new_path, new_path);
+
+       if (output->type == CONSUMER_DST_NET) {
+               msg.u.rotate_rename.relayd_id = output->net_seq_index;
+       } else {
+               msg.u.rotate_rename.relayd_id = -1ULL;
+       }
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               ret = -LTTNG_ERR_ROTATE_RENAME_FAIL_CONSUMER;
+               goto error;
+       }
+
+error:
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Ask the consumer if a rotation is locally pending. Must be called with the
+ * socket lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_check_rotation_pending_local(struct consumer_socket *socket,
+               uint64_t session_id, uint64_t chunk_id)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+       uint32_t pending = 0;
+
+       assert(socket);
+
+       DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64,
+                       session_id, chunk_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL;
+       msg.u.check_rotation_pending_local.session_id = session_id;
+       msg.u.check_rotation_pending_local.chunk_id = chunk_id;
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               ret = -LTTNG_ERR_ROTATION_PENDING_LOCAL_FAIL_CONSUMER;
+               goto error;
+       }
+
+       ret = consumer_socket_recv(socket, &pending, sizeof(pending));
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = pending;
+
+error:
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Ask the consumer if a rotation is pending on the relayd. Must be called with
+ * the socket lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_check_rotation_pending_relay(struct consumer_socket *socket,
+               const struct consumer_output *output, uint64_t session_id,
+               uint64_t chunk_id)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+       uint32_t pending = 0;
+
+       assert(socket);
+
+       DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64,
+                       session_id, chunk_id);
+       assert(output->type == CONSUMER_DST_NET);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY;
+       msg.u.check_rotation_pending_relay.session_id = session_id;
+       msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index;
+       msg.u.check_rotation_pending_relay.chunk_id = chunk_id;
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               ret = -LTTNG_ERR_ROTATION_PENDING_RELAY_FAIL_CONSUMER;
+               goto error;
+       }
+
+       ret = consumer_socket_recv(socket, &pending, sizeof(pending));
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = pending;
+
+error:
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Ask the consumer to create a directory.
+ *
+ * Called with the consumer socket lock held.
+ */
+int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id,
+               const struct consumer_output *output, const char *path,
+               uid_t uid, gid_t gid)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+
+       DBG("Consumer mkdir %s in session %" PRIu64, path, session_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_MKDIR;
+       msg.u.mkdir.session_id = session_id;
+       msg.u.mkdir.uid = uid;
+       msg.u.mkdir.gid = gid;
+       ret = snprintf(msg.u.mkdir.path, sizeof(msg.u.mkdir.path), "%s", path);
+       if (ret < 0 || ret >= sizeof(msg.u.mkdir.path)) {
+               ERR("Format path");
+               ret = -LTTNG_ERR_INVALID;
+               goto error;
+       }
+
+       if (output->type == CONSUMER_DST_NET) {
+               msg.u.mkdir.relayd_id = output->net_seq_index;
+       } else {
+               msg.u.mkdir.relayd_id = -1ULL;
+       }
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               ret = -LTTNG_ERR_MKDIR_FAIL_CONSUMER;
+               goto error;
+       }
+
+error:
+       health_code_update();
+       return ret;
+}
This page took 0.029975 seconds and 5 git commands to generate.