projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix: unhandled prev_seq initial value
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
stream.c
diff --git
a/src/bin/lttng-relayd/stream.c
b/src/bin/lttng-relayd/stream.c
index b604919a78b376062cc52ed8a0a5b189d0be3143..5ed37c58ec1d2c0956b550ac837bac314d1e33a0 100644
(file)
--- a/
src/bin/lttng-relayd/stream.c
+++ b/
src/bin/lttng-relayd/stream.c
@@
-17,7
+17,6
@@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
#include <common/utils.h>
#define _LGPL_SOURCE
#include <common/common.h>
#include <common/utils.h>
@@
-33,16
+32,7
@@
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
- bool has_ref = false;
-
- pthread_mutex_lock(&stream->reflock);
- if (stream->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&stream->ref);
- }
- pthread_mutex_unlock(&stream->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&stream->ref);
}
/*
}
/*
@@
-87,7
+77,6
@@
struct relay_stream *stream_create(struct ctf_trace *trace,
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
- ret = -1;
goto error_no_alloc;
}
goto error_no_alloc;
}
@@
-99,9
+88,9
@@
struct relay_stream *stream_create(struct ctf_trace *trace,
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
stream->channel_name = channel_name;
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
stream->channel_name = channel_name;
+ stream->rotate_at_seq_num = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
- pthread_mutex_init(&stream->reflock, NULL);
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
@@
-149,7
+138,7
@@
struct relay_stream *stream_create(struct ctf_trace *trace,
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME,
LTTNG_
NAME_MAX)) {
stream->is_metadata = 1;
}
stream->is_metadata = 1;
}
@@
-228,8
+217,7
@@
unlock:
/*
* Stream must be protected by holding the stream lock or by virtue of being
/*
* Stream must be protected by holding the stream lock or by virtue of being
- * called from stream_destroy, in which case it is guaranteed to be accessed
- * from a single thread by the reflock.
+ * called from stream_destroy.
*/
static void stream_unpublish(struct relay_stream *stream)
{
*/
static void stream_unpublish(struct relay_stream *stream)
{
@@
-253,6
+241,11
@@
static void stream_unpublish(struct relay_stream *stream)
static void stream_destroy(struct relay_stream *stream)
{
if (stream->indexes_ht) {
static void stream_destroy(struct relay_stream *stream)
{
if (stream->indexes_ht) {
+ /*
+ * Calling lttng_ht_destroy in call_rcu worker thread so
+ * we don't hold the RCU read-side lock while calling
+ * it.
+ */
lttng_ht_destroy(stream->indexes_ht);
}
if (stream->tfa) {
lttng_ht_destroy(stream->indexes_ht);
}
if (stream->tfa) {
@@
-274,9
+267,6
@@
static void stream_destroy_rcu(struct rcu_head *rcu_head)
/*
* No need to take stream->lock since this is only called on the final
* stream_put which ensures that a single thread may act on the stream.
/*
* No need to take stream->lock since this is only called on the final
* stream_put which ensures that a single thread may act on the stream.
- *
- * At that point, the object is also protected by the reflock which
- * guarantees that no other thread may share ownership of this stream.
*/
static void stream_release(struct urcu_ref *ref)
{
*/
static void stream_release(struct urcu_ref *ref)
{
@@
-302,9
+292,9
@@
static void stream_release(struct urcu_ref *ref)
stream_fd_put(stream->stream_fd);
stream->stream_fd = NULL;
}
stream_fd_put(stream->stream_fd);
stream->stream_fd = NULL;
}
- if (stream->index_f
d
) {
-
stream_fd_put(stream->index_fd
);
- stream->index_f
d
= NULL;
+ if (stream->index_f
ile
) {
+
lttng_index_file_put(stream->index_file
);
+ stream->index_f
ile
= NULL;
}
if (stream->trace) {
ctf_trace_put(stream->trace);
}
if (stream->trace) {
ctf_trace_put(stream->trace);
@@
-317,15
+307,7
@@
static void stream_release(struct urcu_ref *ref)
void stream_put(struct relay_stream *stream)
{
DBG("stream put for stream id %" PRIu64, stream->stream_handle);
void stream_put(struct relay_stream *stream)
{
DBG("stream put for stream id %" PRIu64, stream->stream_handle);
- /*
- * Ensure existence of stream->reflock for stream unlock.
- */
rcu_read_lock();
rcu_read_lock();
- /*
- * Stream reflock ensures that concurrent test and update of
- * stream ref is atomic.
- */
- pthread_mutex_lock(&stream->reflock);
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
@@
-335,13
+317,20
@@
void stream_put(struct relay_stream *stream)
stream->stream_handle,
(int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
stream->stream_handle,
(int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
- pthread_mutex_unlock(&stream->reflock);
rcu_read_unlock();
}
void try_stream_close(struct relay_stream *stream)
{
rcu_read_unlock();
}
void try_stream_close(struct relay_stream *stream)
{
+ bool session_aborted;
+ struct relay_session *session = stream->trace->session;
+
DBG("Trying to close stream %" PRIu64, stream->stream_handle);
DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->lock);
+ session_aborted = session->aborted;
+ pthread_mutex_unlock(&session->lock);
+
pthread_mutex_lock(&stream->lock);
/*
* Can be called concurently by connection close and reception of last
pthread_mutex_lock(&stream->lock);
/*
* Can be called concurently by connection close and reception of last
@@
-383,7
+372,8
@@
void try_stream_close(struct relay_stream *stream)
}
if (stream->last_net_seq_num != -1ULL &&
}
if (stream->last_net_seq_num != -1ULL &&
- ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+ && !session_aborted) {
/*
* Don't close since we still have data pending. This
* handles cases where an explicit close command has
/*
* Don't close since we still have data pending. This
* handles cases where an explicit close command has
@@
-442,6
+432,10
@@
void print_relay_streams(void)
struct lttng_ht_iter iter;
struct relay_stream *stream;
struct lttng_ht_iter iter;
struct relay_stream *stream;
+ if (!relay_streams_ht) {
+ return;
+ }
+
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
This page took
0.02732 seconds
and
5
git commands to generate.