+
+static void rcu_destroy_session(struct rcu_head *rcu_head)
+{
+ struct relay_session *session =
+ caa_container_of(rcu_head, struct relay_session,
+ rcu_node);
+ /*
+ * Since each trace has a reference on the session, it means
+ * that if we are at the point where we teardown the session, no
+ * trace belonging to that session exist at this point.
+ * Calling lttng_ht_destroy in call_rcu worker thread so we
+ * don't hold the RCU read-side lock while calling it.
+ */
+ lttng_ht_destroy(session->ctf_traces_ht);
+ free(session);
+}
+
+/*
+ * Delete session from the given hash table.
+ *
+ * Return lttng ht del error code being 0 on success and 1 on failure.
+ */
+static int session_delete(struct relay_session *session)
+{
+ struct lttng_ht_iter iter;
+
+ iter.iter.node = &session->session_n.node;
+ return lttng_ht_del(sessions_ht, &iter);
+}
+
+
+static void destroy_session(struct relay_session *session)
+{
+ int ret;
+
+ ret = session_delete(session);
+ assert(!ret);
+ call_rcu(&session->rcu_node, rcu_destroy_session);
+}
+
+void session_release(struct urcu_ref *ref)
+{
+ struct relay_session *session =
+ caa_container_of(ref, struct relay_session, ref);
+
+ destroy_session(session);
+}
+
+void session_put(struct relay_session *session)
+{
+ rcu_read_lock();
+ pthread_mutex_lock(&session->reflock);
+ urcu_ref_put(&session->ref, session_release);
+ pthread_mutex_unlock(&session->reflock);
+ rcu_read_unlock();
+}
+
+int session_close(struct relay_session *session)
+{
+ int ret = 0;
+ struct ctf_trace *trace;
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ pthread_mutex_lock(&session->lock);
+ DBG("closing session %" PRIu64 ": is conn already closed %d",
+ session->id, session->connection_closed);
+ if (session->connection_closed) {
+ ret = -1;
+ goto unlock;
+ }
+ session->connection_closed = true;
+unlock:
+ pthread_mutex_unlock(&session->lock);
+ if (ret) {
+ return ret;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
+ &iter.iter, trace, node.node) {
+ ret = ctf_trace_close(trace);
+ if (ret) {
+ goto rcu_unlock;
+ }
+ }
+ cds_list_for_each_entry_rcu(stream, &session->recv_list,
+ recv_node) {
+ /* Close streams which have not been published yet. */
+ try_stream_close(stream);
+ }
+rcu_unlock:
+ rcu_read_unlock();
+ if (ret) {
+ return ret;
+ }
+ /* Put self-reference from create. */
+ session_put(session);
+ return ret;
+}
+
+void print_sessions(void)
+{
+ struct lttng_ht_iter iter;
+ struct relay_session *session;
+
+ if (!sessions_ht) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
+ session_n.node) {
+ if (!session_get(session)) {
+ continue;
+ }
+ DBG("session %p refcount %ld session %" PRIu64,
+ session,
+ session->ref.refcount,
+ session->id);
+ session_put(session);
+ }
+ rcu_read_unlock();
+}