+static int create_ust_streams(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, cpu = 0;
+ struct ustctl_consumer_stream *ustream;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+ assert(ctx);
+
+ /*
+ * While a stream is available from ustctl. When NULL is returned, we've
+ * reached the end of the possible stream for the channel.
+ */
+ while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
+ int wait_fd;
+
+ wait_fd = ustctl_stream_get_wait_fd(ustream);
+
+ /* Allocate consumer stream object. */
+ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
+ if (!stream) {
+ goto error_alloc;
+ }
+ stream->ustream = ustream;
+ /*
+ * Store it so we can save multiple function calls afterwards since
+ * this value is used heavily in the stream threads. This is UST
+ * specific so this is why it's done after allocation.
+ */
+ stream->wait_fd = wait_fd;
+
+ /*
+ * Order is important this is why a list is used. On error, the caller
+ * should clean this list.
+ */
+ cds_list_add_tail(&stream->send_node, &channel->streams.head);
+
+ ret = ustctl_get_max_subbuf_size(stream->ustream,
+ &stream->max_sb_size);
+ if (ret < 0) {
+ ERR("ustctl_get_max_subbuf_size failed for stream %s",
+ stream->name);
+ goto error;
+ }
+
+ /* Do actions once stream has been received. */
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(stream);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
+ stream->name, stream->key, stream->relayd_stream_id);
+
+ /* Set next CPU stream. */
+ channel->streams.count = ++cpu;
+
+ /* Keep stream reference when creating metadata. */
+ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ channel->metadata_stream = stream;
+ }
+ }
+
+ return 0;
+
+error:
+error_alloc:
+ return ret;
+}
+
+/*
+ * Create an UST channel with the given attributes and send it to the session
+ * daemon using the ust ctl API.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
+ struct ustctl_consumer_channel **chanp)
+{
+ int ret;
+ struct ustctl_consumer_channel *channel;
+
+ assert(attr);
+ assert(chanp);
+
+ DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
+ "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
+ "switch_timer_interval: %u, read_timer_interval: %u, "
+ "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
+ attr->num_subbuf, attr->switch_timer_interval,
+ attr->read_timer_interval, attr->output, attr->type);
+
+ channel = ustctl_create_channel(attr);
+ if (!channel) {
+ ret = -1;
+ goto error_create;
+ }
+
+ *chanp = channel;
+
+ return 0;
+
+error_create:
+ return ret;
+}
+
+/*
+ * Send a single given stream to the session daemon using the sock.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(sock >= 0);
+
+ DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
+
+ /* Send stream to session daemon. */
+ ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Send channel to sessiond.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int send_sessiond_channel(int sock,
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx, int *relayd_error)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+ assert(ctx);
+ assert(sock >= 0);
+
+ DBG("UST consumer sending channel %s to sessiond", channel->name);
+
+ /* Send channel to sessiond. */
+ ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* The channel was sent successfully to the sessiond at this point. */
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ /* Try to send the stream to the relayd if one is available. */
+ ret = send_stream_to_relayd(stream);
+ if (ret < 0) {
+ /*
+ * Flag that the relayd was the problem here probably due to a
+ * communicaton error on the socket.
+ */
+ if (relayd_error) {
+ *relayd_error = 1;
+ }
+ goto error;
+ }
+
+ /* Send stream to session daemon. */
+ ret = send_sessiond_stream(sock, stream);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ /* Tell sessiond there is no more stream. */
+ ret = ustctl_send_stream_to_sessiond(sock, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG("UST consumer NULL stream sent to sessiond");
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Creates a channel and streams and add the channel it to the channel internal
+ * state. The created stream must ONLY be sent once the GET_CHANNEL command is
+ * received.
+ *
+ * Return 0 on success or else, a negative value is returned and the channel
+ * MUST be destroyed by consumer_del_channel().
+ */
+static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
+ struct lttng_consumer_channel *channel,
+ struct ustctl_consumer_channel_attr *attr)
+{
+ int ret;
+
+ assert(ctx);
+ assert(channel);
+ assert(attr);
+
+ /*
+ * This value is still used by the kernel consumer since for the kernel,
+ * the stream ownership is not IN the consumer so we need to have the
+ * number of left stream that needs to be initialized so we can know when
+ * to delete the channel (see consumer.c).
+ *
+ * As for the user space tracer now, the consumer creates and sends the
+ * stream to the session daemon which only sends them to the application
+ * once every stream of a channel is received making this value useless
+ * because we they will be added to the poll thread before the application
+ * receives them. This ensures that a stream can not hang up during
+ * initilization of a channel.
+ */
+ channel->nb_init_stream_left = 0;
+
+ /* The reply msg status is handled in the following call. */
+ ret = create_ust_channel(attr, &channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
+
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Open all streams for this channel. */
+ ret = create_ust_streams(channel, ctx);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Send all stream of a channel to the right thread handling it.
+ *
+ * On error, return a negative value else 0 on success.
+ */
+static int send_streams_to_thread(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)