+ /*
+ * For each consumer socket, create and send the relayd object of the
+ * snapshot output.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(snap_output->consumer->socks->ht, &iter.iter,
+ socket, node.node) {
+ pthread_mutex_lock(socket->lock);
+ ret = send_consumer_relayd_sockets(0, session->id,
+ snap_output->consumer, socket,
+ session->name, session->hostname,
+ session->live_timer);
+ pthread_mutex_unlock(socket->lock);
+ if (ret != LTTNG_OK) {
+ rcu_read_unlock();
+ goto error;
+ }
+ }
+ rcu_read_unlock();
+
+error:
+ return ret;
+}
+
+/*
+ * Record a kernel snapshot.
+ *
+ * Return LTTNG_OK on success or a LTTNG_ERR code.
+ */
+static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
+ struct snapshot_output *output, struct ltt_session *session,
+ int wait, uint64_t nb_packets_per_stream)
+{
+ int ret;
+
+ assert(ksess);
+ assert(output);
+ assert(session);
+
+
+ /*
+ * Copy kernel session sockets so we can communicate with the right
+ * consumer for the snapshot record command.
+ */
+ ret = consumer_copy_sockets(output->consumer, ksess->consumer);
+ if (ret < 0) {
+ ret = LTTNG_ERR_NOMEM;
+ goto error;
+ }
+
+ ret = set_relayd_for_snapshot(ksess->consumer, output, session);
+ if (ret != LTTNG_OK) {
+ goto error_snapshot;
+ }
+
+ ret = kernel_snapshot_record(ksess, output, wait, nb_packets_per_stream);
+ if (ret != LTTNG_OK) {
+ goto error_snapshot;
+ }
+
+ ret = LTTNG_OK;
+ goto end;
+
+error_snapshot:
+ /* Clean up copied sockets so this output can use some other later on. */
+ consumer_destroy_output_sockets(output->consumer);
+error:
+end:
+ return ret;
+}
+
+/*
+ * Record a UST snapshot.
+ *
+ * Return 0 on success or a LTTNG_ERR error code.
+ */
+static int record_ust_snapshot(struct ltt_ust_session *usess,
+ struct snapshot_output *output, struct ltt_session *session,
+ int wait, uint64_t nb_packets_per_stream)
+{
+ int ret;
+
+ assert(usess);
+ assert(output);
+ assert(session);
+
+ /*
+ * Copy UST session sockets so we can communicate with the right
+ * consumer for the snapshot record command.
+ */
+ ret = consumer_copy_sockets(output->consumer, usess->consumer);
+ if (ret < 0) {
+ ret = LTTNG_ERR_NOMEM;
+ goto error;
+ }
+
+ ret = set_relayd_for_snapshot(usess->consumer, output, session);
+ if (ret != LTTNG_OK) {
+ goto error_snapshot;
+ }
+
+ ret = ust_app_snapshot_record(usess, output, wait, nb_packets_per_stream);
+ if (ret < 0) {
+ switch (-ret) {
+ case EINVAL:
+ ret = LTTNG_ERR_INVALID;
+ break;
+ default:
+ ret = LTTNG_ERR_SNAPSHOT_FAIL;
+ break;
+ }
+ goto error_snapshot;
+ }
+
+ ret = LTTNG_OK;
+
+error_snapshot:
+ /* Clean up copied sockets so this output can use some other later on. */
+ consumer_destroy_output_sockets(output->consumer);
+error:
+ return ret;
+}
+
+static
+uint64_t get_session_size_one_more_packet_per_stream(struct ltt_session *session,
+ uint64_t cur_nr_packets)
+{
+ uint64_t tot_size = 0;
+
+ if (session->kernel_session) {
+ struct ltt_kernel_channel *chan;
+ struct ltt_kernel_session *ksess = session->kernel_session;
+
+ cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
+ if (cur_nr_packets >= chan->channel->attr.num_subbuf) {
+ /*
+ * Don't take channel into account if we
+ * already grab all its packets.
+ */
+ continue;
+ }
+ tot_size += chan->channel->attr.subbuf_size
+ * chan->stream_count;
+ }
+ }
+
+ if (session->ust_session) {
+ struct ltt_ust_session *usess = session->ust_session;
+
+ tot_size += ust_app_get_size_one_more_packet_per_stream(usess,
+ cur_nr_packets);
+ }
+
+ return tot_size;
+}
+
+/*
+ * Calculate the number of packets we can grab from each stream that
+ * fits within the overall snapshot max size.
+ *
+ * Returns -1 on error, 0 means infinite number of packets, else > 0 is
+ * the number of packets per stream.
+ *
+ * TODO: this approach is not perfect: we consider the worse case
+ * (packet filling the sub-buffers) as an upper bound, but we could do
+ * better if we do this calculation while we actually grab the packet
+ * content: we would know how much padding we don't actually store into
+ * the file.
+ *
+ * This algorithm is currently bounded by the number of packets per
+ * stream.
+ *
+ * Since we call this algorithm before actually grabbing the data, it's
+ * an approximation: for instance, applications could appear/disappear
+ * in between this call and actually grabbing data.
+ */
+static
+int64_t get_session_nb_packets_per_stream(struct ltt_session *session, uint64_t max_size)
+{
+ int64_t size_left;
+ uint64_t cur_nb_packets = 0;
+
+ if (!max_size) {
+ return 0; /* Infinite */
+ }
+
+ size_left = max_size;
+ for (;;) {
+ uint64_t one_more_packet_tot_size;
+
+ one_more_packet_tot_size = get_session_size_one_more_packet_per_stream(session,
+ cur_nb_packets);
+ if (!one_more_packet_tot_size) {
+ /* We are already grabbing all packets. */
+ break;
+ }
+ size_left -= one_more_packet_tot_size;
+ if (size_left < 0) {
+ break;
+ }
+ cur_nb_packets++;
+ }
+ if (!cur_nb_packets) {
+ /* Not enough room to grab one packet of each stream, error. */
+ return -1;
+ }
+ return cur_nb_packets;
+}
+
+/*
+ * Command LTTNG_SNAPSHOT_RECORD from lib lttng ctl.
+ *
+ * The wait parameter is ignored so this call always wait for the snapshot to
+ * complete before returning.
+ *
+ * Return LTTNG_OK on success or else a LTTNG_ERR code.
+ */
+int cmd_snapshot_record(struct ltt_session *session,
+ struct lttng_snapshot_output *output, int wait)
+{
+ int ret = LTTNG_OK;
+ unsigned int use_tmp_output = 0;
+ struct snapshot_output tmp_output;
+ unsigned int snapshot_success = 0;
+ char datetime[16];
+
+ assert(session);
+ assert(output);
+
+ DBG("Cmd snapshot record for session %s", session->name);
+
+ /* Get the datetime for the snapshot output directory. */
+ ret = utils_get_current_time_str("%Y%m%d-%H%M%S", datetime,
+ sizeof(datetime));
+ if (!ret) {
+ ret = LTTNG_ERR_INVALID;
+ goto error;
+ }
+
+ /*
+ * Permission denied to create an output if the session is not
+ * set in no output mode.
+ */
+ if (session->output_traces) {
+ ret = LTTNG_ERR_NOT_SNAPSHOT_SESSION;
+ goto error;
+ }
+
+ /* The session needs to be started at least once. */
+ if (!session->has_been_started) {
+ ret = LTTNG_ERR_START_SESSION_ONCE;
+ goto error;
+ }
+
+ /* Use temporary output for the session. */
+ if (*output->ctrl_url != '\0') {
+ ret = snapshot_output_init(output->max_size, output->name,
+ output->ctrl_url, output->data_url, session->consumer,
+ &tmp_output, NULL);
+ if (ret < 0) {
+ if (ret == -ENOMEM) {
+ ret = LTTNG_ERR_NOMEM;
+ } else {
+ ret = LTTNG_ERR_INVALID;
+ }
+ goto error;
+ }
+ /* Use the global session count for the temporary snapshot. */
+ tmp_output.nb_snapshot = session->snapshot.nb_snapshot;
+
+ /* Use the global datetime */
+ memcpy(tmp_output.datetime, datetime, sizeof(datetime));
+ use_tmp_output = 1;
+ }
+
+ if (use_tmp_output) {
+ int64_t nb_packets_per_stream;
+
+ nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+ tmp_output.max_size);
+ if (nb_packets_per_stream < 0) {
+ ret = LTTNG_ERR_MAX_SIZE_INVALID;
+ goto error;
+ }
+
+ if (session->kernel_session) {
+ ret = record_kernel_snapshot(session->kernel_session,
+ &tmp_output, session,
+ wait, nb_packets_per_stream);
+ if (ret != LTTNG_OK) {
+ goto error;
+ }
+ }
+
+ if (session->ust_session) {
+ ret = record_ust_snapshot(session->ust_session,
+ &tmp_output, session,
+ wait, nb_packets_per_stream);
+ if (ret != LTTNG_OK) {
+ goto error;
+ }
+ }
+
+ snapshot_success = 1;
+ } else {
+ struct snapshot_output *sout;
+ struct lttng_ht_iter iter;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
+ &iter.iter, sout, node.node) {
+ int64_t nb_packets_per_stream;
+
+ /*
+ * Make a local copy of the output and assign the possible
+ * temporary value given by the caller.
+ */
+ memset(&tmp_output, 0, sizeof(tmp_output));
+ memcpy(&tmp_output, sout, sizeof(tmp_output));
+
+ if (output->max_size != (uint64_t) -1ULL) {
+ tmp_output.max_size = output->max_size;
+ }
+
+ nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+ tmp_output.max_size);
+ if (nb_packets_per_stream < 0) {
+ ret = LTTNG_ERR_MAX_SIZE_INVALID;
+ rcu_read_unlock();
+ goto error;
+ }
+
+ /* Use temporary name. */
+ if (*output->name != '\0') {
+ if (lttng_strncpy(tmp_output.name, output->name,
+ sizeof(tmp_output.name))) {
+ ret = LTTNG_ERR_INVALID;
+ rcu_read_unlock();
+ goto error;
+ }
+ }
+
+ tmp_output.nb_snapshot = session->snapshot.nb_snapshot;
+ memcpy(tmp_output.datetime, datetime, sizeof(datetime));
+
+ if (session->kernel_session) {
+ ret = record_kernel_snapshot(session->kernel_session,
+ &tmp_output, session,
+ wait, nb_packets_per_stream);
+ if (ret != LTTNG_OK) {
+ rcu_read_unlock();
+ goto error;
+ }
+ }
+
+ if (session->ust_session) {
+ ret = record_ust_snapshot(session->ust_session,
+ &tmp_output, session,
+ wait, nb_packets_per_stream);
+ if (ret != LTTNG_OK) {
+ rcu_read_unlock();
+ goto error;
+ }
+ }
+ snapshot_success = 1;
+ }
+ rcu_read_unlock();
+ }
+
+ if (snapshot_success) {
+ session->snapshot.nb_snapshot++;
+ } else {
+ ret = LTTNG_ERR_SNAPSHOT_FAIL;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Command LTTNG_SET_SESSION_SHM_PATH processed by the client thread.
+ */
+int cmd_set_session_shm_path(struct ltt_session *session,
+ const char *shm_path)
+{
+ /* Safety net */
+ assert(session);
+
+ /*
+ * Can only set shm path before session is started.
+ */
+ if (session->has_been_started) {
+ return LTTNG_ERR_SESSION_STARTED;
+ }
+
+ strncpy(session->shm_path, shm_path,
+ sizeof(session->shm_path));
+ session->shm_path[sizeof(session->shm_path) - 1] = '\0';
+
+ return 0;
+}
+
+/*
+ * Command LTTNG_ROTATE_SESSION from the lttng-ctl library.
+ *
+ * Ask the consumer to rotate the session output directory.
+ * The session lock must be held.
+ *
+ * Return LTTNG_OK on success or else a LTTNG_ERR code.
+ */
+int cmd_rotate_session(struct ltt_session *session,
+ struct lttng_rotate_session_return *rotate_return)
+{
+ int ret;
+ size_t strf_ret;
+ struct tm *timeinfo;
+ char datetime[21];
+ time_t now;
+ bool ust_active = false;
+
+ assert(session);
+
+ if (!session->has_been_started) {
+ ret = -LTTNG_ERR_START_SESSION_ONCE;
+ goto end;