/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
- bool has_ref = false;
-
- pthread_mutex_lock(&stream->reflock);
- if (stream->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&stream->ref);
- }
- pthread_mutex_unlock(&stream->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&stream->ref);
}
/*
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
- ret = -1;
goto error_no_alloc;
}
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
stream->channel_name = channel_name;
+ stream->rotate_at_seq_num = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
- pthread_mutex_init(&stream->reflock, NULL);
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
/*
* Stream must be protected by holding the stream lock or by virtue of being
- * called from stream_destroy, in which case it is guaranteed to be accessed
- * from a single thread by the reflock.
+ * called from stream_destroy.
*/
static void stream_unpublish(struct relay_stream *stream)
{
/*
* No need to take stream->lock since this is only called on the final
* stream_put which ensures that a single thread may act on the stream.
- *
- * At that point, the object is also protected by the reflock which
- * guarantees that no other thread may share ownership of this stream.
*/
static void stream_release(struct urcu_ref *ref)
{
void stream_put(struct relay_stream *stream)
{
DBG("stream put for stream id %" PRIu64, stream->stream_handle);
- /*
- * Ensure existence of stream->reflock for stream unlock.
- */
rcu_read_lock();
- /*
- * Stream reflock ensures that concurrent test and update of
- * stream ref is atomic.
- */
- pthread_mutex_lock(&stream->reflock);
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
stream->stream_handle,
(int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
- pthread_mutex_unlock(&stream->reflock);
rcu_read_unlock();
}
void try_stream_close(struct relay_stream *stream)
{
+ bool session_aborted;
+ struct relay_session *session = stream->trace->session;
+
DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->lock);
+ session_aborted = session->aborted;
+ pthread_mutex_unlock(&session->lock);
+
pthread_mutex_lock(&stream->lock);
/*
* Can be called concurently by connection close and reception of last
}
if (stream->last_net_seq_num != -1ULL &&
- ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+ && !session_aborted) {
/*
* Don't close since we still have data pending. This
* handles cases where an explicit close command has