- /* Handle stream on the relayd if the output is on the network */
- if (relayd) {
- if (stream->metadata_flag) {
- /* Only lock if metadata since we use the control socket. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- }
-
- ret = consumer_handle_stream_before_relayd(stream, len);
- if (ret >= 0) {
- outfd = ret;
-
- /* Write metadata stream id before payload */
- if (stream->metadata_flag) {
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(outfd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
+ stream->chan = channel;
+
+error:
+ if (_alloc_ret) {
+ *_alloc_ret = alloc_ret;
+ }
+ return stream;
+}
+
+/*
+ * Send the given stream pointer to the corresponding thread.
+ *
+ * Returns 0 on success else a negative value.
+ */
+static int send_stream_to_thread(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, stream_pipe;
+
+ /* Get the right pipe where the stream will be sent. */
+ if (stream->metadata_flag) {
+ stream_pipe = ctx->consumer_metadata_pipe[1];
+ } else {
+ stream_pipe = ctx->consumer_data_pipe[1];
+ }
+
+ do {
+ ret = write(stream_pipe, &stream, sizeof(stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ stream->metadata_flag ? "metadata" : "data", stream_pipe);
+ }
+
+ return ret;
+}
+
+/*
+ * Search for a relayd object related to the stream. If found, send the stream
+ * to the relayd.
+ *
+ * On success, returns 0 else a negative value.
+ */
+static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Add stream on the relayd */
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ stream->chan->pathname, &stream->relayd_stream_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto error;
+ }
+ } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
+ stream->net_seq_idx);
+ ret = -1;
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Create streams for the given channel using liblttng-ust-ctl.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int create_ust_streams(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, cpu = 0;
+ struct ustctl_consumer_stream *ustream;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+ assert(ctx);
+
+ /*
+ * While a stream is available from ustctl. When NULL is returned, we've
+ * reached the end of the possible stream for the channel.
+ */
+ while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
+ int wait_fd;
+
+ wait_fd = ustctl_stream_get_wait_fd(ustream);
+
+ /* Allocate consumer stream object. */
+ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
+ if (!stream) {
+ goto error_alloc;
+ }
+ stream->ustream = ustream;
+ /*
+ * Store it so we can save multiple function calls afterwards since
+ * this value is used heavily in the stream threads. This is UST
+ * specific so this is why it's done after allocation.
+ */
+ stream->wait_fd = wait_fd;
+
+ /*
+ * Order is important this is why a list is used. On error, the caller
+ * should clean this list.
+ */
+ cds_list_add_tail(&stream->send_node, &channel->streams.head);
+
+ ret = ustctl_get_max_subbuf_size(stream->ustream,
+ &stream->max_sb_size);
+ if (ret < 0) {
+ ERR("ustctl_get_max_subbuf_size failed for stream %s",
+ stream->name);
+ goto error;
+ }
+
+ /* Do actions once stream has been received. */
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(stream);
+ if (ret < 0) {
+ goto error;