+ *chanp = channel;
+
+ return 0;
+
+error_create:
+ return ret;
+}
+
+/*
+ * Send a single given stream to the session daemon using the sock.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(sock >= 0);
+
+ DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
+
+ /* Send stream to session daemon. */
+ ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Send channel to sessiond.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int send_sessiond_channel(int sock,
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx, int *relayd_error)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+ assert(ctx);
+ assert(sock >= 0);
+
+ DBG("UST consumer sending channel %s to sessiond", channel->name);
+
+ /* Send channel to sessiond. */
+ ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* The channel was sent successfully to the sessiond at this point. */
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ /* Try to send the stream to the relayd if one is available. */
+ ret = send_stream_to_relayd(stream);
+ if (ret < 0) {
+ /*
+ * Flag that the relayd was the problem here probably due to a
+ * communicaton error on the socket.
+ */
+ if (relayd_error) {
+ *relayd_error = 1;
+ }
+ goto error;
+ }
+
+ /* Send stream to session daemon. */
+ ret = send_sessiond_stream(sock, stream);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ /* Tell sessiond there is no more stream. */
+ ret = ustctl_send_stream_to_sessiond(sock, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG("UST consumer NULL stream sent to sessiond");
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Creates a channel and streams and add the channel it to the channel internal
+ * state. The created stream must ONLY be sent once the GET_CHANNEL command is
+ * received.
+ *
+ * Return 0 on success or else, a negative value is returned and the channel
+ * MUST be destroyed by consumer_del_channel().
+ */
+static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
+ struct lttng_consumer_channel *channel,
+ struct ustctl_consumer_channel_attr *attr)
+{
+ int ret;
+
+ assert(ctx);
+ assert(channel);
+ assert(attr);
+
+ /*
+ * This value is still used by the kernel consumer since for the kernel,
+ * the stream ownership is not IN the consumer so we need to have the
+ * number of left stream that needs to be initialized so we can know when
+ * to delete the channel (see consumer.c).
+ *
+ * As for the user space tracer now, the consumer creates and sends the
+ * stream to the session daemon which only sends them to the application
+ * once every stream of a channel is received making this value useless
+ * because we they will be added to the poll thread before the application
+ * receives them. This ensures that a stream can not hang up during
+ * initilization of a channel.
+ */
+ channel->nb_init_stream_left = 0;
+
+ /* The reply msg status is handled in the following call. */
+ ret = create_ust_channel(attr, &channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Open all streams for this channel. */
+ ret = create_ust_streams(channel, ctx);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Send all stream of a channel to the right thread handling it.
+ *
+ * On error, return a negative value else 0 on success.
+ */
+static int send_streams_to_thread(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+ struct lttng_consumer_stream *stream, *stmp;
+
+ assert(channel);
+ assert(ctx);
+
+ /* Send streams to the corresponding thread. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ /* Sending the stream to the thread. */
+ ret = send_stream_to_thread(stream, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ goto error;
+ }
+
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Write metadata to the given channel using ustctl to convert the string to
+ * the ringbuffer.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int push_metadata(struct lttng_consumer_channel *metadata,
+ const char *metadata_str, uint64_t target_offset, uint64_t len)
+{
+ int ret;
+
+ assert(metadata);
+ assert(metadata_str);
+
+ DBG("UST consumer writing metadata to channel %s", metadata->name);
+
+ assert(target_offset == metadata->contig_metadata_written);
+ ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+ if (ret < 0) {
+ ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
+ goto error;
+ }
+ metadata->contig_metadata_written += len;
+
+ ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
+
+error:
+ return ret;
+}
+
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int close_metadata(uint64_t chan_key)
+{
+ int ret;
+ struct lttng_consumer_channel *channel;
+
+ DBG("UST consumer close metadata key %lu", chan_key);
+
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer close metadata %lu not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+ if (ret < 0) {
+ ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
+{
+ int ret;
+ struct lttng_consumer_channel *metadata;
+
+ DBG("UST consumer setup metadata key %lu", key);
+
+ metadata = consumer_find_channel(key);
+ if (!metadata) {
+ ERR("UST consumer push metadata %" PRIu64 " not found", key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ /*
+ * Send metadata stream to relayd if one available. Availability is
+ * known if the stream is still in the list of the channel.
+ */
+ if (cds_list_empty(&metadata->streams.head)) {
+ ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+ /* Send metadata stream to relayd if needed. */
+ ret = send_stream_to_relayd(metadata->metadata_stream);
+ if (ret < 0) {
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+ ret = send_streams_to_thread(metadata, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+ /* List MUST be empty after or else it could be reused. */
+ assert(cds_list_empty(&metadata->streams.head));
+
+ ret = 0;
+
+error: