- if (session->metadata_stream_fd >= 0) {
- /* Send metadata channel fd */
- lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
- lkm.u.channel.channel_key = session->metadata->fd;
- lkm.u.channel.max_sb_size = session->metadata->conf->attr.subbuf_size;
- lkm.u.channel.mmap_len = 0; /* for kernel */
- DBG("Sending metadata channel %d to consumer", lkm.u.channel.channel_key);
- ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm));
+ health_code_update();
+
+error:
+ return ret;
+}
+
+/*
+ * Sending the notification that all streams were sent with STREAMS_SENT.
+ */
+int kernel_consumer_streams_sent(struct consumer_socket *sock,
+ struct ltt_kernel_session *session, uint64_t channel_key)
+{
+ int ret;
+ struct lttcomm_consumer_msg lkm;
+ struct consumer_output *consumer;
+
+ assert(sock);
+ assert(session);
+
+ DBG("Sending streams_sent");
+ /* Get consumer output pointer */
+ consumer = session->consumer;
+
+ /* Prep stream consumer message */
+ consumer_init_streams_sent_comm_msg(&lkm,
+ LTTNG_CONSUMER_STREAMS_SENT,
+ channel_key, consumer->net_seq_index);
+
+ health_code_update();
+
+ /* Send stream and file descriptor */
+ ret = consumer_send_msg(sock, &lkm);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Send all stream fds of kernel channel to the consumer.
+ */
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
+ unsigned int monitor)
+{
+ int ret = LTTNG_OK;
+ struct ltt_kernel_stream *stream;
+
+ /* Safety net */
+ assert(channel);
+ assert(session);
+ assert(session->consumer);
+ assert(sock);
+
+ /* Bail out if consumer is disabled */
+ if (!session->consumer->enabled) {
+ ret = LTTNG_OK;
+ goto error;
+ }
+
+ DBG("Sending streams of channel %s to kernel consumer",
+ channel->channel->name);
+
+ if (!channel->sent_to_consumer) {
+ ret = kernel_consumer_add_channel(sock, channel, session, monitor);