+ case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
+ {
+ int ret;
+ struct ustctl_consumer_channel_attr attr;
+
+ /* Create a plain object and reserve a channel key. */
+ channel = allocate_channel(msg.u.ask_channel.session_id,
+ msg.u.ask_channel.pathname, msg.u.ask_channel.name,
+ msg.u.ask_channel.uid, msg.u.ask_channel.gid,
+ msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
+ (enum lttng_event_output) msg.u.ask_channel.output,
+ msg.u.ask_channel.tracefile_size,
+ msg.u.ask_channel.tracefile_count,
+ msg.u.ask_channel.session_id_per_pid,
+ msg.u.ask_channel.monitor);
+ if (!channel) {
+ goto end_channel_error;
+ }
+
+ /* Build channel attributes from received message. */
+ attr.subbuf_size = msg.u.ask_channel.subbuf_size;
+ attr.num_subbuf = msg.u.ask_channel.num_subbuf;
+ attr.overwrite = msg.u.ask_channel.overwrite;
+ attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
+ attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
+ attr.chan_id = msg.u.ask_channel.chan_id;
+ memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
+
+ /* Translate and save channel type. */
+ switch (msg.u.ask_channel.type) {
+ case LTTNG_UST_CHAN_PER_CPU:
+ channel->type = CONSUMER_CHANNEL_TYPE_DATA;
+ attr.type = LTTNG_UST_CHAN_PER_CPU;
+ /*
+ * Set refcount to 1 for owner. Below, we will
+ * pass ownership to the
+ * consumer_thread_channel_poll() thread.
+ */
+ channel->refcount = 1;
+ break;
+ case LTTNG_UST_CHAN_METADATA:
+ channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
+ attr.type = LTTNG_UST_CHAN_METADATA;
+ break;
+ default:
+ assert(0);
+ goto error_fatal;
+ };
+
+ ret = ask_channel(ctx, sock, channel, &attr);
+ if (ret < 0) {
+ goto end_channel_error;
+ }
+
+ if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+ ret = consumer_metadata_cache_allocate(channel);
+ if (ret < 0) {
+ ERR("Allocating metadata cache");
+ goto end_channel_error;
+ }
+ consumer_timer_switch_start(channel, attr.switch_timer_interval);
+ attr.switch_timer_interval = 0;
+ }
+
+ /*
+ * Add the channel to the internal state AFTER all streams were created
+ * and successfully sent to session daemon. This way, all streams must
+ * be ready before this channel is visible to the threads.
+ * If add_channel succeeds, ownership of the channel is
+ * passed to consumer_thread_channel_poll().
+ */
+ ret = add_channel(channel, ctx);
+ if (ret < 0) {
+ if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+ if (channel->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(channel);
+ }
+ consumer_metadata_cache_destroy(channel);
+ }
+ goto end_channel_error;
+ }
+
+ /*
+ * Channel and streams are now created. Inform the session daemon that
+ * everything went well and should wait to receive the channel and
+ * streams with ustctl API.
+ */
+ ret = consumer_send_status_channel(sock, channel);
+ if (ret < 0) {
+ /*
+ * There is probably a problem on the socket.
+ */
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_GET_CHANNEL:
+ {
+ int ret, relayd_err = 0;
+ uint64_t key = msg.u.get_channel.key;
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer get channel key %" PRIu64 " not found", key);
+ ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
+ }
+
+ /* Send everything to sessiond. */
+ ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
+ if (ret < 0) {
+ if (relayd_err) {
+ /*
+ * We were unable to send to the relayd the stream so avoid
+ * sending back a fatal error to the thread since this is OK
+ * and the consumer can continue its work. The above call
+ * has sent the error status message to the sessiond.
+ */
+ goto end_nosignal;
+ }
+ /*
+ * The communicaton was broken hence there is a bad state between
+ * the consumer and sessiond so stop everything.
+ */
+ goto error_fatal;
+ }
+
+ ret = send_streams_to_thread(channel, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ goto error_fatal;
+ }
+ /* List MUST be empty after or else it could be reused. */
+ assert(cds_list_empty(&channel->streams.head));
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_DESTROY_CHANNEL:
+ {
+ uint64_t key = msg.u.destroy_channel.key;
+
+ /*
+ * Only called if streams have not been sent to stream
+ * manager thread. However, channel has been sent to
+ * channel manager thread.
+ */
+ notify_thread_del_channel(ctx, key);
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_CLOSE_METADATA:
+ {
+ int ret;
+
+ ret = close_metadata(msg.u.close_metadata.key);
+ if (ret != 0) {
+ ret_code = ret;
+ }
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_FLUSH_CHANNEL:
+ {
+ int ret;
+
+ ret = flush_channel(msg.u.flush_channel.key);
+ if (ret != 0) {
+ ret_code = ret;
+ }
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_PUSH_METADATA:
+ {
+ int ret;
+ uint64_t len = msg.u.push_metadata.len;
+ uint64_t key = msg.u.push_metadata.key;
+ uint64_t offset = msg.u.push_metadata.target_offset;
+ struct lttng_consumer_channel *channel;
+
+ DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
+ len);
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer push metadata %" PRIu64 " not found", key);
+ ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
+ }
+
+ /* Tell session daemon we are ready to receive the metadata. */
+ ret = consumer_send_status_msg(sock, LTTNG_OK);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ /* Wait for more data. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
+ len, channel);
+ if (ret < 0) {
+ /* error receiving from sessiond */
+ goto error_fatal;
+ } else {
+ ret_code = ret;
+ goto end_msg_sessiond;
+ }
+ }
+ case LTTNG_CONSUMER_SETUP_METADATA:
+ {
+ int ret;
+
+ ret = setup_metadata(ctx, msg.u.setup_metadata.key);
+ if (ret) {
+ ret_code = ret;
+ }
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
+ {
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }