+ DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+ /*
+ * Ensure existence of stream->reflock for stream unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Stream reflock ensures that concurrent test and update of
+ * stream ref is atomic.
+ */
+ pthread_mutex_lock(&stream->reflock);
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ DBG("stream put stream id %" PRIu64 " refcount %d",
+ stream->stream_handle,
+ (int) stream->ref.refcount);
+ urcu_ref_put(&stream->ref, stream_release);
+ pthread_mutex_unlock(&stream->reflock);
+ rcu_read_unlock();
+}
+
+void stream_close(struct relay_stream *stream)
+{
+ DBG("closing stream %" PRIu64, stream->stream_handle);
+ pthread_mutex_lock(&stream->lock);
+ stream->closed = true;
+ relay_index_close_all(stream);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+}
+
+void print_relay_streams(void)
+{
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ stream,
+ stream->ref.refcount,
+ stream->stream_handle,
+ stream->trace->id,
+ stream->trace->session->id);
+ stream_put(stream);
+ }
+ rcu_read_unlock();