+
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
+ *
+ * Return 1 if the traced data are still getting read else 0 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(stream->ustream);
+
+ DBG("UST consumer checking data pending");
+
+ ret = ustctl_get_next_subbuf(stream->ustream);
+ if (ret == 0) {
+ /* There is still data so let's put back this subbuffer. */
+ ret = ustctl_put_subbuf(stream->ustream);
+ assert(ret == 0);
+ ret = 1; /* Data is pending */
+ goto end;
+ }
+
+ /* Data is NOT pending so ready to be read. */
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Close every metadata stream wait fd of the metadata hash table. This
+ * function MUST be used very carefully so not to run into a race between the
+ * metadata thread handling streams and this function closing their wait fd.
+ *
+ * For UST, this is used when the session daemon hangs up. Its the metadata
+ * producer so calling this is safe because we are assured that no state change
+ * can occur in the metadata thread for the streams in the hash table.
+ */
+void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ assert(metadata_ht);
+ assert(metadata_ht->ht);
+
+ DBG("UST consumer closing all metadata streams");
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
+ node.node) {
+ int fd = stream->wait_fd;
+
+ /*
+ * Whatever happens here we have to continue to try to close every
+ * streams. Let's report at least the error on failure.
+ */
+ ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+ if (ret) {
+ ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
+ }
+ DBG("Metadata wait fd %d closed", fd);
+ }
+ rcu_read_unlock();
+}
+
+void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+ if (ret < 0) {
+ ERR("Unable to close wakeup fd");
+ }
+}
+
+int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_channel *channel)
+{
+ struct lttcomm_metadata_request_msg request;
+ struct lttcomm_consumer_msg msg;
+ enum lttng_error_code ret_code = LTTNG_OK;
+ uint64_t len, key, offset;
+ int ret;
+
+ assert(channel);
+ assert(channel->metadata_cache);
+
+ /* send the metadata request to sessiond */
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER64_UST:
+ request.bits_per_long = 64;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ request.bits_per_long = 32;
+ break;
+ default:
+ request.bits_per_long = 0;
+ break;
+ }
+
+ request.session_id = channel->session_id;
+ request.uid = channel->uid;
+ request.key = channel->key;
+ DBG("Sending metadata request to sessiond, session %" PRIu64,
+ channel->session_id);
+
+ ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
+ sizeof(request));
+ if (ret < 0) {
+ ERR("Asking metadata to sessiond");
+ goto end;
+ }
+
+ /* Receive the metadata from sessiond */
+ ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
+ sizeof(msg));
+ if (ret != sizeof(msg)) {
+ DBG("Consumer received unexpected message size %d (expects %zu)",
+ ret, sizeof(msg));
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ /*
+ * The ret value might 0 meaning an orderly shutdown but this is ok
+ * since the caller handles this.
+ */
+ goto end;
+ }
+
+ if (msg.cmd_type == LTTNG_ERR_UND) {
+ /* No registry found */
+ (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
+ ret_code);
+ ret = 0;
+ goto end;
+ } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
+ ERR("Unexpected cmd_type received %d", msg.cmd_type);
+ ret = -1;
+ goto end;
+ }
+
+ len = msg.u.push_metadata.len;
+ key = msg.u.push_metadata.key;
+ offset = msg.u.push_metadata.target_offset;
+
+ assert(key == channel->key);
+ if (len == 0) {
+ DBG("No new metadata to receive for key %" PRIu64, key);
+ }
+
+ /* Tell session daemon we are ready to receive the metadata. */
+ ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
+ LTTNG_OK);
+ if (ret < 0 || len == 0) {
+ /*
+ * Somehow, the session daemon is not responding anymore or there is
+ * nothing to receive.
+ */
+ goto end;
+ }
+
+ ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+ key, offset, len, channel);
+ if (ret_code >= 0) {
+ /*
+ * Only send the status msg if the sessiond is alive meaning a positive
+ * ret code.
+ */
+ (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+ }
+ ret = 0;
+
+end:
+ return ret;
+}