Fix: reset current size for tracefile rotation
[lttng-tools.git] / src / common / consumer.c
index a9070b1c9bc3f976ef26a2fbbc183c7fd4b75343..01266a700cb37199966c9257eba335689e877453 100644 (file)
@@ -50,12 +50,14 @@ struct lttng_consumer_global_data consumer_data = {
 
 enum consumer_channel_action {
        CONSUMER_CHANNEL_ADD,
+       CONSUMER_CHANNEL_DEL,
        CONSUMER_CHANNEL_QUIT,
 };
 
 struct consumer_channel_msg {
        enum consumer_channel_action action;
-       struct lttng_consumer_channel *chan;
+       struct lttng_consumer_channel *chan;    /* add */
+       uint64_t key;                           /* del */
 };
 
 /*
@@ -91,6 +93,7 @@ static void notify_thread_pipe(int wpipe)
 
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
+               uint64_t key,
                enum consumer_channel_action action)
 {
        struct consumer_channel_msg msg;
@@ -103,8 +106,15 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
        } while (ret < 0 && errno == EINTR);
 }
 
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+}
+
 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel **chan,
+               uint64_t *key,
                enum consumer_channel_action *action)
 {
        struct consumer_channel_msg msg;
@@ -116,6 +126,7 @@ static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
        if (ret > 0) {
                *action = msg.action;
                *chan = msg.chan;
+               *key = msg.key;
        }
        return ret;
 }
@@ -319,9 +330,9 @@ static void cleanup_relayd_ht(void)
                destroy_relayd(relayd);
        }
 
-       lttng_ht_destroy(consumer_data.relayd_ht);
-
        rcu_read_unlock();
+
+       lttng_ht_destroy(consumer_data.relayd_ht);
 }
 
 /*
@@ -516,8 +527,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        }
        rcu_read_unlock();
 
-       uatomic_dec(&stream->chan->refcount);
-       if (!uatomic_read(&stream->chan->refcount)
+       if (!uatomic_sub_return(&stream->chan->refcount, 1)
                        && !uatomic_read(&stream->chan->nb_init_stream_left)) {
                free_chan = stream->chan;
        }
@@ -659,6 +669,8 @@ static int add_stream(struct lttng_consumer_stream *stream,
         * stream.
         */
        if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+               /* Increment refcount before decrementing nb_init_stream_left */
+               cmm_smp_wmb();
                uatomic_dec(&stream->chan->nb_init_stream_left);
        }
 
@@ -719,6 +731,8 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
        obj->net_seq_idx = net_seq_idx;
        obj->refcount = 0;
        obj->destroy_flag = 0;
+       obj->control_sock.sock.fd = -1;
+       obj->data_sock.sock.fd = -1;
        lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
        pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
 
@@ -899,7 +913,7 @@ end:
 
        if (!ret && channel->wait_fd != -1 &&
                        channel->metadata_stream == NULL) {
-               notify_channel_pipe(ctx, channel, CONSUMER_CHANNEL_ADD);
+               notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
        return ret;
 }
@@ -1287,99 +1301,6 @@ end:
        return ret;
 }
 
-/*
- * Create the tracefile on disk.
- *
- * Return 0 on success or else a negative value.
- */
-int lttng_create_output_file(struct lttng_consumer_stream *stream)
-{
-       int ret;
-       char full_path[PATH_MAX];
-       char *path_name_id = NULL;
-       char *path;
-
-       assert(stream);
-
-       /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ret = 0;
-               goto end;
-       }
-
-       ret = snprintf(full_path, sizeof(full_path), "%s/%s",
-                       stream->chan->pathname, stream->name);
-       if (ret < 0) {
-               PERROR("snprintf create output file");
-               goto error;
-       }
-
-       /*
-        * If we split the trace in multiple files, we have to add the tracefile
-        * current count at the end of the tracefile name
-        */
-       if (stream->chan->tracefile_size > 0) {
-               ret = asprintf(&path_name_id, "%s_%" PRIu64, full_path,
-                               stream->tracefile_count_current);
-               if (ret < 0) {
-                       PERROR("Allocating path name ID");
-                       goto error;
-               }
-               path = path_name_id;
-       } else {
-               path = full_path;
-       }
-
-       ret = run_as_open(path, O_WRONLY | O_CREAT | O_TRUNC,
-                       S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
-       if (ret < 0) {
-               PERROR("open stream path %s", path);
-               goto error_open;
-       }
-       stream->out_fd = ret;
-       stream->tracefile_size_current = 0;
-
-error_open:
-       free(path_name_id);
-error:
-end:
-       return ret;
-}
-
-/*
- * Change the output tracefile according to the tracefile_size and
- * tracefile_count parameters. The stream lock MUST be held before calling this
- * function because we are modifying the stream status.
- *
- * Return 0 on success or else a negative value.
- */
-static int rotate_output_file(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-       assert(stream->tracefile_size_current);
-
-       ret = close(stream->out_fd);
-       if (ret < 0) {
-               PERROR("Closing tracefile");
-               goto end;
-       }
-
-       if (stream->chan->tracefile_count > 0) {
-               stream->tracefile_count_current =
-                       (stream->tracefile_count_current + 1) %
-                       stream->chan->tracefile_count;
-       } else {
-               stream->tracefile_count_current++;
-       }
-
-       return lttng_create_output_file(stream);
-
-end:
-       return ret;
-}
-
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile. This is a
  * core function for writing trace buffers to either the local filesystem or
@@ -1494,12 +1415,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                if (stream->chan->tracefile_size > 0 &&
                                (stream->tracefile_size_current + len) >
                                stream->chan->tracefile_size) {
-                       ret = rotate_output_file(stream);
+                       ret = utils_rotate_stream_file(stream->chan->pathname,
+                                       stream->name, stream->chan->tracefile_size,
+                                       stream->chan->tracefile_count, stream->uid, stream->gid,
+                                       stream->out_fd, &(stream->tracefile_count_current));
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd;
+                       outfd = stream->out_fd = ret;
+                       /* Reset current size because we just perform a rotation. */
+                       stream->tracefile_size_current = 0;
                }
                stream->tracefile_size_current += len;
        }
@@ -1671,12 +1597,17 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                if (stream->chan->tracefile_size > 0 &&
                                (stream->tracefile_size_current + len) >
                                stream->chan->tracefile_size) {
-                       ret = rotate_output_file(stream);
+                       ret = utils_rotate_stream_file(stream->chan->pathname,
+                                       stream->name, stream->chan->tracefile_size,
+                                       stream->chan->tracefile_count, stream->uid, stream->gid,
+                                       stream->out_fd, &(stream->tracefile_count_current));
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd;
+                       outfd = stream->out_fd = ret;
+                       /* Reset current size because we just perform a rotation. */
+                       stream->tracefile_size_current = 0;
                }
                stream->tracefile_size_current += len;
        }
@@ -2024,8 +1955,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        /* Atomically decrement channel refcount since other threads can use it. */
-       uatomic_dec(&stream->chan->refcount);
-       if (!uatomic_read(&stream->chan->refcount)
+       if (!uatomic_sub_return(&stream->chan->refcount, 1)
                        && !uatomic_read(&stream->chan->nb_init_stream_left)) {
                /* Go for channel deletion! */
                free_chan = stream->chan;
@@ -2095,6 +2025,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
         * stream.
         */
        if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+               /* Increment refcount before decrementing nb_init_stream_left */
+               cmm_smp_wmb();
                uatomic_dec(&stream->chan->nb_init_stream_left);
        }
 
@@ -2644,6 +2576,13 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key,
                        &iter.iter, stream, node_channel_id.node) {
+               /*
+                * Protect against teardown with mutex.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
                switch (consumer_data.type) {
                case LTTNG_CONSUMER_KERNEL:
                        break;
@@ -2660,6 +2599,8 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
                        ERR("Unknown consumer_data type");
                        assert(0);
                }
+       next:
+               pthread_mutex_unlock(&stream->lock);
        }
        rcu_read_unlock();
 }
@@ -2767,8 +2708,9 @@ restart:
                                        continue;
                                } else if (revents & LPOLLIN) {
                                        enum consumer_channel_action action;
+                                       uint64_t key;
 
-                                       ret = read_channel_pipe(ctx, &chan, &action);
+                                       ret = read_channel_pipe(ctx, &chan, &key, &action);
                                        if (ret <= 0) {
                                                ERR("Error reading channel pipe");
                                                continue;
@@ -2787,6 +2729,27 @@ restart:
                                                lttng_poll_add(&events, chan->wait_fd,
                                                                LPOLLIN | LPOLLPRI);
                                                break;
+                                       case CONSUMER_CHANNEL_DEL:
+                                       {
+                                               chan = consumer_find_channel(key);
+                                               if (!chan) {
+                                                       ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+                                                       break;
+                                               }
+                                               lttng_poll_del(&events, chan->wait_fd);
+                                               ret = lttng_ht_del(channel_ht, &iter);
+                                               assert(ret == 0);
+                                               consumer_close_channel_streams(chan);
+
+                                               /*
+                                                * Release our own refcount. Force channel deletion even if
+                                                * streams were not initialized.
+                                                */
+                                               if (!uatomic_sub_return(&chan->refcount, 1)) {
+                                                       consumer_del_channel(chan);
+                                               }
+                                               goto restart;
+                                       }
                                        case CONSUMER_CHANNEL_QUIT:
                                                /*
                                                 * Remove the pipe from the poll set and continue the loop
@@ -2824,6 +2787,12 @@ restart:
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
                                consumer_close_channel_streams(chan);
+
+                               /* Release our own refcount */
+                               if (!uatomic_sub_return(&chan->refcount, 1)
+                                               && !uatomic_read(&chan->nb_init_stream_left)) {
+                                       consumer_del_channel(chan);
+                               }
                        }
 
                        /* Release RCU lock for the channel looked up */
@@ -3001,7 +2970,7 @@ end:
         */
        notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
-       notify_channel_pipe(ctx, NULL, CONSUMER_CHANNEL_QUIT);
+       notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
@@ -3086,7 +3055,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
-       struct consumer_relayd_sock_pair *relayd;
+       struct consumer_relayd_sock_pair *relayd = NULL;
 
        assert(ctx);
        assert(relayd_sock);
This page took 0.028656 seconds and 5 git commands to generate.