Fix relayd: stream index file created in the wrong directory
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 02bf9dd54995ed2aab6d026ecb578d9127610908..41d44a58598ff9033dfea8cdd69d44f0a63de75b 100644 (file)
@@ -17,7 +17,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <common/common.h>
 #include <common/utils.h>
 /* Should be called with RCU read-side lock held. */
 bool stream_get(struct relay_stream *stream)
 {
-       bool has_ref = false;
-
-       pthread_mutex_lock(&stream->reflock);
-       if (stream->ref.refcount != 0) {
-               has_ref = true;
-               urcu_ref_get(&stream->ref);
-       }
-       pthread_mutex_unlock(&stream->reflock);
-
-       return has_ref;
+       return urcu_ref_get_unless_zero(&stream->ref);
 }
 
 /*
@@ -78,7 +68,8 @@ end:
 struct relay_stream *stream_create(struct ctf_trace *trace,
        uint64_t stream_handle, char *path_name,
        char *channel_name, uint64_t tracefile_size,
-       uint64_t tracefile_count)
+       uint64_t tracefile_count,
+       const struct relay_stream_chunk_id *chunk_id)
 {
        int ret;
        struct relay_stream *stream = NULL;
@@ -87,24 +78,26 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
        stream = zmalloc(sizeof(struct relay_stream));
        if (stream == NULL) {
                PERROR("relay stream zmalloc");
-               ret = -1;
                goto error_no_alloc;
        }
 
        stream->stream_handle = stream_handle;
-       stream->prev_seq = -1ULL;
+       stream->prev_data_seq = -1ULL;
+       stream->prev_index_seq = -1ULL;
        stream->last_net_seq_num = -1ULL;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
        stream->path_name = path_name;
+       stream->prev_path_name = NULL;
        stream->channel_name = channel_name;
+       stream->rotate_at_seq_num = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
-       pthread_mutex_init(&stream->reflock, NULL);
        urcu_ref_init(&stream->ref);
        ctf_trace_get(trace);
        stream->trace = trace;
+       stream->current_chunk_id = *chunk_id;
 
        stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!stream->indexes_ht) {
@@ -228,8 +221,7 @@ unlock:
 
 /*
  * Stream must be protected by holding the stream lock or by virtue of being
- * called from stream_destroy, in which case it is guaranteed to be accessed
- * from a single thread by the reflock.
+ * called from stream_destroy.
  */
 static void stream_unpublish(struct relay_stream *stream)
 {
@@ -264,6 +256,7 @@ static void stream_destroy(struct relay_stream *stream)
                tracefile_array_destroy(stream->tfa);
        }
        free(stream->path_name);
+       free(stream->prev_path_name);
        free(stream->channel_name);
        free(stream);
 }
@@ -279,9 +272,6 @@ static void stream_destroy_rcu(struct rcu_head *rcu_head)
 /*
  * No need to take stream->lock since this is only called on the final
  * stream_put which ensures that a single thread may act on the stream.
- *
- * At that point, the object is also protected by the reflock which
- * guarantees that no other thread may share ownership of this stream.
  */
 static void stream_release(struct urcu_ref *ref)
 {
@@ -307,9 +297,9 @@ static void stream_release(struct urcu_ref *ref)
                stream_fd_put(stream->stream_fd);
                stream->stream_fd = NULL;
        }
-       if (stream->index_fd) {
-               stream_fd_put(stream->index_fd);
-               stream->index_fd = NULL;
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
        }
        if (stream->trace) {
                ctf_trace_put(stream->trace);
@@ -322,15 +312,7 @@ static void stream_release(struct urcu_ref *ref)
 void stream_put(struct relay_stream *stream)
 {
        DBG("stream put for stream id %" PRIu64, stream->stream_handle);
-       /*
-        * Ensure existence of stream->reflock for stream unlock.
-        */
        rcu_read_lock();
-       /*
-        * Stream reflock ensures that concurrent test and update of
-        * stream ref is atomic.
-        */
-       pthread_mutex_lock(&stream->reflock);
        assert(stream->ref.refcount != 0);
        /*
         * Wait until we have processed all the stream packets before
@@ -340,13 +322,20 @@ void stream_put(struct relay_stream *stream)
                        stream->stream_handle,
                        (int) stream->ref.refcount);
        urcu_ref_put(&stream->ref, stream_release);
-       pthread_mutex_unlock(&stream->reflock);
        rcu_read_unlock();
 }
 
 void try_stream_close(struct relay_stream *stream)
 {
+       bool session_aborted;
+       struct relay_session *session = stream->trace->session;
+
        DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+       pthread_mutex_lock(&session->lock);
+       session_aborted = session->aborted;
+       pthread_mutex_unlock(&session->lock);
+
        pthread_mutex_lock(&stream->lock);
        /*
         * Can be called concurently by connection close and reception of last
@@ -377,6 +366,7 @@ void try_stream_close(struct relay_stream *stream)
                 * a packet. Since those are sent in that order, we take
                 * care of consumerd crashes.
                 */
+               DBG("relay_index_close_partial_fd");
                relay_index_close_partial_fd(stream);
                /*
                 * Use the highest net_seq_num we currently have pending
@@ -384,11 +374,13 @@ void try_stream_close(struct relay_stream *stream)
                 * at -1ULL if we cannot find any index.
                 */
                stream->last_net_seq_num = relay_index_find_last(stream);
+               DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
                /* Fall-through into the next check. */
        }
 
        if (stream->last_net_seq_num != -1ULL &&
-                       ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+                       ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
+                       && !session_aborted) {
                /*
                 * Don't close since we still have data pending. This
                 * handles cases where an explicit close command has
@@ -447,6 +439,10 @@ void print_relay_streams(void)
        struct lttng_ht_iter iter;
        struct relay_stream *stream;
 
+       if (!relay_streams_ht) {
+               return;
+       }
+
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
                        node.node) {
This page took 0.026736 seconds and 5 git commands to generate.