+ call_rcu(&ua_chan->rcu_head, delete_ust_app_channel_rcu);
+}
+
+int ust_app_register_done(struct ust_app *app)
+{
+ int ret;
+
+ pthread_mutex_lock(&app->sock_lock);
+ ret = ustctl_register_done(app->sock);
+ pthread_mutex_unlock(&app->sock_lock);
+ return ret;
+}
+
+int ust_app_release_object(struct ust_app *app, struct lttng_ust_object_data *data)
+{
+ int ret, sock;
+
+ if (app) {
+ pthread_mutex_lock(&app->sock_lock);
+ sock = app->sock;
+ } else {
+ sock = -1;
+ }
+ ret = ustctl_release_object(sock, data);
+ if (app) {
+ pthread_mutex_unlock(&app->sock_lock);
+ }
+ return ret;
+}
+
+/*
+ * Push metadata to consumer socket.
+ *
+ * RCU read-side lock must be held to guarantee existance of socket.
+ * Must be called with the ust app session lock held.
+ * Must be called with the registry lock held.
+ *
+ * On success, return the len of metadata pushed or else a negative value.
+ * Returning a -EPIPE return value means we could not send the metadata,
+ * but it can be caused by recoverable errors (e.g. the application has
+ * terminated concurrently).
+ */
+ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
+ struct consumer_socket *socket, int send_zero_data)
+{
+ int ret;
+ char *metadata_str = NULL;
+ size_t len, offset, new_metadata_len_sent;
+ ssize_t ret_val;
+ uint64_t metadata_key;
+
+ assert(registry);
+ assert(socket);
+
+ metadata_key = registry->metadata_key;
+
+ /*
+ * Means that no metadata was assigned to the session. This can
+ * happens if no start has been done previously.
+ */
+ if (!metadata_key) {
+ return 0;
+ }
+
+ /*
+ * On a push metadata error either the consumer is dead or the
+ * metadata channel has been destroyed because its endpoint
+ * might have died (e.g: relayd), or because the application has
+ * exited. If so, the metadata closed flag is set to 1 so we
+ * deny pushing metadata again which is not valid anymore on the
+ * consumer side.
+ */
+ if (registry->metadata_closed) {
+ return -EPIPE;
+ }
+
+ offset = registry->metadata_len_sent;
+ len = registry->metadata_len - registry->metadata_len_sent;
+ new_metadata_len_sent = registry->metadata_len;
+ if (len == 0) {
+ DBG3("No metadata to push for metadata key %" PRIu64,
+ registry->metadata_key);
+ ret_val = len;
+ if (send_zero_data) {
+ DBG("No metadata to push");
+ goto push_data;
+ }
+ goto end;
+ }
+
+ /* Allocate only what we have to send. */
+ metadata_str = zmalloc(len);
+ if (!metadata_str) {
+ PERROR("zmalloc ust app metadata string");
+ ret_val = -ENOMEM;
+ goto error;
+ }
+ /* Copy what we haven't sent out. */
+ memcpy(metadata_str, registry->metadata + offset, len);
+
+push_data:
+ pthread_mutex_unlock(®istry->lock);
+ /*
+ * We need to unlock the registry while we push metadata to
+ * break a circular dependency between the consumerd metadata
+ * lock and the sessiond registry lock. Indeed, pushing metadata
+ * to the consumerd awaits that it gets pushed all the way to
+ * relayd, but doing so requires grabbing the metadata lock. If
+ * a concurrent metadata request is being performed by
+ * consumerd, this can try to grab the registry lock on the
+ * sessiond while holding the metadata lock on the consumer
+ * daemon. Those push and pull schemes are performed on two
+ * different bidirectionnal communication sockets.
+ */
+ ret = consumer_push_metadata(socket, metadata_key,
+ metadata_str, len, offset);
+ pthread_mutex_lock(®istry->lock);
+ if (ret < 0) {
+ /*
+ * There is an acceptable race here between the registry
+ * metadata key assignment and the creation on the
+ * consumer. The session daemon can concurrently push
+ * metadata for this registry while being created on the
+ * consumer since the metadata key of the registry is
+ * assigned *before* it is setup to avoid the consumer
+ * to ask for metadata that could possibly be not found
+ * in the session daemon.
+ *
+ * The metadata will get pushed either by the session
+ * being stopped or the consumer requesting metadata if
+ * that race is triggered.
+ */
+ if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) {
+ ret = 0;
+ } else {
+ ERR("Error pushing metadata to consumer");
+ }
+ ret_val = ret;
+ goto error_push;
+ } else {
+ /*
+ * Metadata may have been concurrently pushed, since
+ * we're not holding the registry lock while pushing to
+ * consumer. This is handled by the fact that we send
+ * the metadata content, size, and the offset at which
+ * that metadata belongs. This may arrive out of order
+ * on the consumer side, and the consumer is able to
+ * deal with overlapping fragments. The consumer
+ * supports overlapping fragments, which must be
+ * contiguous starting from offset 0. We keep the
+ * largest metadata_len_sent value of the concurrent
+ * send.
+ */
+ registry->metadata_len_sent =
+ max_t(size_t, registry->metadata_len_sent,
+ new_metadata_len_sent);
+ }
+ free(metadata_str);
+ return len;
+
+end:
+error:
+ if (ret_val) {
+ /*
+ * On error, flag the registry that the metadata is
+ * closed. We were unable to push anything and this
+ * means that either the consumer is not responding or
+ * the metadata cache has been destroyed on the
+ * consumer.
+ */
+ registry->metadata_closed = 1;
+ }
+error_push:
+ free(metadata_str);
+ return ret_val;
+}
+
+/*
+ * For a given application and session, push metadata to consumer.
+ * Either sock or consumer is required : if sock is NULL, the default
+ * socket to send the metadata is retrieved from consumer, if sock
+ * is not NULL we use it to send the metadata.
+ * RCU read-side lock must be held while calling this function,
+ * therefore ensuring existance of registry. It also ensures existance
+ * of socket throughout this function.
+ *
+ * Return 0 on success else a negative error.
+ * Returning a -EPIPE return value means we could not send the metadata,
+ * but it can be caused by recoverable errors (e.g. the application has
+ * terminated concurrently).
+ */
+static int push_metadata(struct ust_registry_session *registry,
+ struct consumer_output *consumer)
+{
+ int ret_val;
+ ssize_t ret;
+ struct consumer_socket *socket;
+
+ assert(registry);
+ assert(consumer);
+
+ pthread_mutex_lock(®istry->lock);
+ if (registry->metadata_closed) {
+ ret_val = -EPIPE;
+ goto error;
+ }
+
+ /* Get consumer socket to use to push the metadata.*/
+ socket = consumer_find_socket_by_bitness(registry->bits_per_long,
+ consumer);
+ if (!socket) {
+ ret_val = -1;
+ goto error;
+ }
+
+ ret = ust_app_push_metadata(registry, socket, 0);
+ if (ret < 0) {
+ ret_val = ret;
+ goto error;
+ }
+ pthread_mutex_unlock(®istry->lock);
+ return 0;
+
+error:
+ pthread_mutex_unlock(®istry->lock);
+ return ret_val;
+}
+
+/*
+ * Send to the consumer a close metadata command for the given session. Once
+ * done, the metadata channel is deleted and the session metadata pointer is
+ * nullified. The session lock MUST be held unless the application is
+ * in the destroy path.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int close_metadata(struct ust_registry_session *registry,
+ struct consumer_output *consumer)
+{
+ int ret;
+ struct consumer_socket *socket;
+
+ assert(registry);
+ assert(consumer);
+
+ rcu_read_lock();
+
+ pthread_mutex_lock(®istry->lock);
+
+ if (!registry->metadata_key || registry->metadata_closed) {
+ ret = 0;
+ goto end;
+ }
+
+ /* Get consumer socket to use to push the metadata.*/
+ socket = consumer_find_socket_by_bitness(registry->bits_per_long,
+ consumer);
+ if (!socket) {
+ ret = -1;
+ goto error;
+ }
+
+ ret = consumer_close_metadata(socket, registry->metadata_key);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ /*
+ * Metadata closed. Even on error this means that the consumer is not
+ * responding or not found so either way a second close should NOT be emit
+ * for this registry.
+ */
+ registry->metadata_closed = 1;
+end:
+ pthread_mutex_unlock(®istry->lock);
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * We need to execute ht_destroy outside of RCU read-side critical
+ * section and outside of call_rcu thread, so we postpone its execution
+ * using ht_cleanup_push. It is simpler than to change the semantic of
+ * the many callers of delete_ust_app_session().
+ */
+static
+void delete_ust_app_session_rcu(struct rcu_head *head)
+{
+ struct ust_app_session *ua_sess =
+ caa_container_of(head, struct ust_app_session, rcu_head);
+
+ ht_cleanup_push(ua_sess->channels);
+ free(ua_sess);