Backport: relayd: replace lttng_index_file with relay_index_file
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index b604919a78b376062cc52ed8a0a5b189d0be3143..b033dda94d1f39baf70d52ff50a43a7dc6ed797c 100644 (file)
@@ -17,7 +17,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <common/common.h>
 #include <common/utils.h>
@@ -124,13 +123,8 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
         * No need to use run_as API here because whatever we receive,
         * the relayd uses its own credentials for the stream files.
         */
-       ret = utils_create_stream_file(stream->path_name, stream->channel_name,
-                       stream->tracefile_size, 0, -1, -1, NULL);
-       if (ret < 0) {
-               ERR("Create output file");
-               goto end;
-       }
-       stream->stream_fd = stream_fd_create(ret);
+       stream->stream_fd = stream_fd_create(stream->path_name,
+                       stream->channel_name, stream->tracefile_size, 0, NULL);
        if (!stream->stream_fd) {
                if (close(ret)) {
                        PERROR("Error closing file %d", ret);
@@ -149,7 +143,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
                DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
        }
 
-       if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+       if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) {
                stream->is_metadata = 1;
        }
 
@@ -253,6 +247,11 @@ static void stream_unpublish(struct relay_stream *stream)
 static void stream_destroy(struct relay_stream *stream)
 {
        if (stream->indexes_ht) {
+               /*
+                * Calling lttng_ht_destroy in call_rcu worker thread so
+                * we don't hold the RCU read-side lock while calling
+                * it.
+                */
                lttng_ht_destroy(stream->indexes_ht);
        }
        if (stream->tfa) {
@@ -302,9 +301,9 @@ static void stream_release(struct urcu_ref *ref)
                stream_fd_put(stream->stream_fd);
                stream->stream_fd = NULL;
        }
-       if (stream->index_fd) {
-               stream_fd_put(stream->index_fd);
-               stream->index_fd = NULL;
+       if (stream->index_file) {
+               relay_index_file_put(stream->index_file);
+               stream->index_file = NULL;
        }
        if (stream->trace) {
                ctf_trace_put(stream->trace);
@@ -341,7 +340,15 @@ void stream_put(struct relay_stream *stream)
 
 void try_stream_close(struct relay_stream *stream)
 {
+       bool session_aborted;
+       struct relay_session *session = stream->trace->session;
+
        DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+       pthread_mutex_lock(&session->lock);
+       session_aborted = session->aborted;
+       pthread_mutex_unlock(&session->lock);
+
        pthread_mutex_lock(&stream->lock);
        /*
         * Can be called concurently by connection close and reception of last
@@ -372,6 +379,7 @@ void try_stream_close(struct relay_stream *stream)
                 * a packet. Since those are sent in that order, we take
                 * care of consumerd crashes.
                 */
+               DBG("relay_index_close_partial_fd");
                relay_index_close_partial_fd(stream);
                /*
                 * Use the highest net_seq_num we currently have pending
@@ -379,11 +387,13 @@ void try_stream_close(struct relay_stream *stream)
                 * at -1ULL if we cannot find any index.
                 */
                stream->last_net_seq_num = relay_index_find_last(stream);
+               DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
                /* Fall-through into the next check. */
        }
 
        if (stream->last_net_seq_num != -1ULL &&
-                       ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+                       ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+                       && !session_aborted) {
                /*
                 * Don't close since we still have data pending. This
                 * handles cases where an explicit close command has
@@ -442,6 +452,10 @@ void print_relay_streams(void)
        struct lttng_ht_iter iter;
        struct relay_stream *stream;
 
+       if (!relay_streams_ht) {
+               return;
+       }
+
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
                        node.node) {
This page took 0.027513 seconds and 5 git commands to generate.