+ DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
+
+error:
+ return ret;
+}
+
+/*
+ * Allocate and return a consumer channel object.
+ */
+static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
+ const char *pathname, const char *name, uid_t uid, gid_t gid,
+ int relayd_id, uint64_t key, enum lttng_event_output output)
+{
+ assert(pathname);
+ assert(name);
+
+ return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
+ relayd_id, output);
+}
+
+/*
+ * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
+ * error value if applicable is set in it else it is kept untouched.
+ *
+ * Return NULL on error else the newly allocated stream object.
+ */
+static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx, int *_alloc_ret)
+{
+ int alloc_ret;
+ struct lttng_consumer_stream *stream = NULL;
+
+ assert(channel);
+ assert(ctx);
+
+ stream = consumer_allocate_stream(channel->key,
+ key,
+ LTTNG_CONSUMER_ACTIVE_STREAM,
+ channel->name,
+ channel->uid,
+ channel->gid,
+ channel->relayd_id,
+ channel->session_id,
+ cpu,
+ &alloc_ret,
+ channel->type);
+ if (stream == NULL) {
+ switch (alloc_ret) {
+ case -ENOENT:
+ /*
+ * We could not find the channel. Can happen if cpu hotplug
+ * happens while tearing down.
+ */
+ DBG3("Could not find channel");
+ break;
+ case -ENOMEM:
+ case -EINVAL:
+ default:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ break;
+ }
+ goto error;
+ }
+
+ stream->chan = channel;
+
+error:
+ if (_alloc_ret) {
+ *_alloc_ret = alloc_ret;
+ }
+ return stream;
+}
+
+/*
+ * Send the given stream pointer to the corresponding thread.
+ *
+ * Returns 0 on success else a negative value.
+ */
+static int send_stream_to_thread(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, stream_pipe;
+
+ /* Get the right pipe where the stream will be sent. */
+ if (stream->metadata_flag) {
+ stream_pipe = ctx->consumer_metadata_pipe[1];
+ } else {
+ stream_pipe = ctx->consumer_data_pipe[1];
+ }
+
+ do {
+ ret = write(stream_pipe, &stream, sizeof(stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ stream->metadata_flag ? "metadata" : "data", stream_pipe);
+ }
+
+ return ret;
+}
+
+/*
+ * Search for a relayd object related to the stream. If found, send the stream
+ * to the relayd.
+ *
+ * On success, returns 0 else a negative value.
+ */
+static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Add stream on the relayd */
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ stream->chan->pathname, &stream->relayd_stream_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto error;
+ }
+ } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
+ stream->net_seq_idx);
+ ret = -1;
+ goto error;