fix
[lttng-tools.git] / src / common / consumer / consumer.c
index f0bacfb8ff0c897fd3259168a7733f8427243dc3..193b9ea1b1b5ae56ae4c5d918b7534d6e12d7f0a 100644 (file)
@@ -47,6 +47,7 @@
 #include <common/consumer/consumer-stream.h>
 #include <common/consumer/consumer-testpoint.h>
 #include <common/align.h>
+#include <common/consumer/consumer-metadata-cache.h>
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -66,13 +67,16 @@ struct consumer_channel_msg {
        uint64_t key;                           /* del */
 };
 
+/* Flag used to temporarily pause data consumption from testpoints. */
+int data_consumption_paused;
+
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
  * Also updated by the signal handler (consumer_should_exit()). Read by the
  * polling threads.
  */
-volatile int consumer_quit;
+int consumer_quit;
 
 /*
  * Global hash table containing respectively metadata and data streams. The
@@ -367,6 +371,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->live_timer_enabled == 1) {
                consumer_timer_live_stop(channel);
        }
+       if (channel->monitor_timer_enabled == 1) {
+               consumer_timer_monitor_stop(channel);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -534,6 +541,13 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
        consumer_stream_destroy(stream, metadata_ht);
 }
 
+void consumer_stream_copy_ro_channel_values(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_channel *channel)
+{
+       stream->channel_ro_tracefile_size = channel->tracefile_size;
+       memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -570,7 +584,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->session_id = session_id;
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
-       stream->index_fd = -1;
+       stream->index_file = NULL;
+       stream->last_sequence_number = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -1020,7 +1035,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
-       DBG("Allocated channel (key %" PRIu64 ")", channel->key)
+       DBG("Allocated channel (key %" PRIu64 ")", channel->key);
 
 end:
        return channel;
@@ -1219,7 +1234,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
 
-       consumer_quit = 1;
+       CMM_STORE_SHARED(consumer_quit, 1);
        ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
        if (ret < 1) {
                PERROR("write consumer quit");
@@ -1228,9 +1243,15 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        DBG("Consumer flag that it should quit");
 }
 
+
+/*
+ * Flush pending writes to trace output disk file.
+ */
+static
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
                off_t orig_offset)
 {
+       int ret;
        int outfd = stream->out_fd;
 
        /*
@@ -1261,8 +1282,12 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
         * defined. So it can be expected to lead to lower throughput in
         * streaming.
         */
-       posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+       ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
                        stream->max_sb_size, POSIX_FADV_DONTNEED);
+       if (ret && ret != -ENOSYS) {
+               errno = ret;
+               PERROR("posix_fadvise on fd %i", outfd);
+       }
 }
 
 /*
@@ -1336,6 +1361,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
+       ctx->channel_monitor_pipe = -1;
+
        return ctx;
 
 error_metadata_pipe:
@@ -1519,7 +1546,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
                if (ret < 0) {
-                       ret = -errno;
                        PERROR("tracer ctl get_mmap_read_offset");
                        goto end;
                }
@@ -1555,6 +1581,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                if (stream->metadata_flag) {
                        /* Metadata requires the control socket. */
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       if (stream->reset_metadata_flag) {
+                               ret = relayd_reset_metadata(&relayd->control_sock,
+                                               stream->relayd_stream_id,
+                                               stream->metadata_version);
+                               if (ret < 0) {
+                                       relayd_hang_up = 1;
+                                       goto write_error;
+                               }
+                               stream->reset_metadata_flag = 0;
+                       }
                        netlen += sizeof(struct lttcomm_relayd_metadata_payload);
                }
 
@@ -1578,6 +1614,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                /* No streaming, we have to set the len with the full padding */
                len += padding;
 
+               if (stream->metadata_flag && stream->reset_metadata_flag) {
+                       ret = utils_truncate_stream_file(stream->out_fd, 0);
+                       if (ret < 0) {
+                               ERR("Reset metadata file");
+                               goto end;
+                       }
+                       stream->reset_metadata_flag = 0;
+               }
+
                /*
                 * Check if we need to change the tracefile before writing the packet.
                 */
@@ -1595,15 +1640,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1656,8 +1702,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                lttng_sync_file_range(outfd, stream->out_fd_offset, len,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += len;
+               lttng_consumer_sync_trace_file(stream, orig_offset);
        }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
 
 write_error:
        /*
@@ -1737,6 +1783,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                         */
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
 
+                       if (stream->reset_metadata_flag) {
+                               ret = relayd_reset_metadata(&relayd->control_sock,
+                                               stream->relayd_stream_id,
+                                               stream->metadata_version);
+                               if (ret < 0) {
+                                       relayd_hang_up = 1;
+                                       goto write_error;
+                               }
+                               stream->reset_metadata_flag = 0;
+                       }
                        ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
                                        padding);
                        if (ret < 0) {
@@ -1760,6 +1816,14 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                /* No streaming, we have to set the len with the full padding */
                len += padding;
 
+               if (stream->metadata_flag && stream->reset_metadata_flag) {
+                       ret = utils_truncate_stream_file(stream->out_fd, 0);
+                       if (ret < 0) {
+                               ERR("Reset metadata file");
+                               goto end;
+                       }
+                       stream->reset_metadata_flag = 0;
+               }
                /*
                 * Check if we need to change the tracefile before writing the packet.
                 */
@@ -1778,22 +1842,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = close(stream->index_fd);
-                               if (ret < 0) {
-                                       PERROR("Closing index");
-                                       goto end;
-                               }
-                               stream->index_fd = -1;
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
-                                       written = ret;
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1868,7 +1926,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                stream->output_written += ret_splice;
                written += ret_splice;
        }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
+       if (!relayd) {
+               lttng_consumer_sync_trace_file(stream, orig_offset);
+       }
        goto end;
 
 write_error:
@@ -1905,6 +1965,25 @@ end:
        return written;
 }
 
+/*
+ * Sample the snapshot positions for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_sample_snapshot_positions(stream);
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               return lttng_ustconsumer_sample_snapshot_positions(stream);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
 /*
  * Take a snapshot for a specific fd
  *
@@ -1946,6 +2025,27 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
        }
 }
 
+/*
+ * Get the consumed position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+               unsigned long *pos)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_get_consumed_snapshot(stream, pos);
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
@@ -2008,6 +2108,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
+       if (stream->chan->metadata_cache) {
+               /* Only applicable to userspace consumers. */
+               pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       }
 
        /* Remove any reference to that stream. */
        consumer_stream_delete(stream, ht);
@@ -2031,6 +2135,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
         */
        stream->chan->metadata_stream = NULL;
 
+       if (stream->chan->metadata_cache) {
+               pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
@@ -2217,10 +2324,10 @@ restart:
                DBG("Metadata poll return from wait with %d fd(s)",
                                LTTNG_POLL_GETNB(&events));
                health_poll_exit();
-               DBG("Metadata event catched in thread");
+               DBG("Metadata event caught in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
-                               ERR("Poll EINTR catched");
+                               ERR("Poll EINTR caught");
                                goto restart;
                        }
                        if (LTTNG_POLL_GETNB(&events) == 0) {
@@ -2318,7 +2425,7 @@ restart:
                                        len = ctx->on_buffer_ready(stream, ctx);
                                        /*
                                         * We don't check the return value here since if we get
-                                        * a negative len, it means an error occured thus we
+                                        * a negative len, it means an error occurred thus we
                                         * simply remove it from the poll set and free the
                                         * stream.
                                         */
@@ -2345,7 +2452,7 @@ restart:
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
-                                                * a negative len, it means an error occured thus we
+                                                * a negative len, it means an error occurred thus we
                                                 * simply remove it from the poll set and free the
                                                 * stream.
                                                 */
@@ -2466,13 +2573,16 @@ void *consumer_thread_data_poll(void *data)
                pthread_mutex_unlock(&consumer_data.lock);
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && consumer_quit == 1) {
+               if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
                        err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 2);
+               if (testpoint(consumerd_thread_data_poll)) {
+                       goto end;
+               }
                health_poll_entry();
                num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
@@ -2492,6 +2602,12 @@ void *consumer_thread_data_poll(void *data)
                        goto end;
                }
 
+               if (caa_unlikely(data_consumption_paused)) {
+                       DBG("Data consumption paused, sleeping...");
+                       sleep(1);
+                       goto restart;
+               }
+
                /*
                 * If the consumer_data_pipe triggered poll go directly to the
                 * beginning of the loop to update the array. We want to prioritize
@@ -2800,10 +2916,10 @@ restart:
                DBG("Channel poll return from wait with %d fd(s)",
                                LTTNG_POLL_GETNB(&events));
                health_poll_exit();
-               DBG("Channel event catched in thread");
+               DBG("Channel event caught in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
-                               ERR("Poll EINTR catched");
+                               ERR("Poll EINTR caught");
                                goto restart;
                        }
                        if (LTTNG_POLL_GETNB(&events) == 0) {
@@ -3134,7 +3250,7 @@ void *consumer_thread_sessiond_poll(void *data)
                        err = 0;
                        goto end;
                }
-               if (consumer_quit) {
+               if (CMM_LOAD_SHARED(consumer_quit)) {
                        DBG("consumer_thread_receive_fds received quit from signal");
                        err = 0;        /* All is OK */
                        goto end;
@@ -3159,7 +3275,7 @@ end:
         * when all fds have hung up, the polling thread
         * can exit cleanly
         */
-       consumer_quit = 1;
+       CMM_STORE_SHARED(consumer_quit, 1);
 
        /*
         * Notify the data poll thread to poll back again and test the
@@ -3196,10 +3312,79 @@ error_testpoint:
        return NULL;
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       int ret;
+
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("write to the channel rotate pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+       }
+
+       return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->rotated) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->chan->lock);
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               /*
+                * The ust_metadata_pushed counter has been reset to 0, so now
+                * we can wakeup the metadata thread so it dumps the metadata
+                * cache to the new file.
+                */
+               if (stream->metadata_flag) {
+                       consumer_metadata_wakeup_pipe(stream->chan);
+               }
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       pthread_mutex_unlock(&stream->chan->lock);
+       stream->rotated = 0;
+
+end:
+       return ret;
+}
+
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
+       int rotate_ret;
 
        pthread_mutex_lock(&stream->lock);
        if (stream->metadata_flag) {
@@ -3226,6 +3411,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
        pthread_mutex_unlock(&stream->lock);
+
+       rotate_ret = consumer_post_rotation(stream, ctx);
+       if (rotate_ret < 0) {
+               ERR("Failed after a rotation");
+               ret = -1;
+       }
+
        return ret;
 }
 
@@ -3291,7 +3483,7 @@ error:
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
-int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+ void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
                struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
@@ -3313,7 +3505,6 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
-                       ret = -ENOMEM;
                        ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                } else {
@@ -3346,14 +3537,12 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        if (ret) {
                /* Needing to exit in the middle of a command: error. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
-               ret = -EINTR;
                goto error_nosignal;
        }
 
        /* Get relayd socket from session daemon */
        ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
        if (ret != sizeof(fd)) {
-               ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
 
                /*
@@ -3427,7 +3616,6 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                break;
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
-               ret = -1;
                ret_code = LTTCOMM_CONSUMERD_FATAL;
                goto error;
        }
@@ -3451,7 +3639,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        add_relayd(relayd);
 
        /* All good! */
-       return 0;
+       return;
 
 error:
        if (consumer_send_status_msg(sock, ret_code) < 0) {
@@ -3469,8 +3657,6 @@ error_nosignal:
        if (relayd_created) {
                free(relayd);
        }
-
-       return ret;
 }
 
 /*
@@ -3716,3 +3902,297 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        }
        return start_pos;
 }
+
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_flush(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustctl_flush_buffer(stream, producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Sample the rotate position for all the streams of a channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_channel(uint64_t key, char *path,
+               uint64_t relayd_id, uint32_t metadata,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       DBG("Kernel consumer sample rotate position for channel %" PRIu64, key);
+
+       rcu_read_lock();
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("No channel found for key %" PRIu64, key);
+               ret = -1;
+               goto end;
+       }
+       pthread_mutex_lock(&channel->lock);
+       snprintf(channel->pathname, PATH_MAX, "%s", path);
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               health_code_update();
+
+               /*
+                * Lock stream because we are about to change its state.
+                */
+               pthread_mutex_lock(&stream->lock);
+               memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
+               ret = lttng_consumer_sample_snapshot_positions(stream);
+               if (ret < 0) {
+                       ERR("Taking kernel snapshot positions");
+                       goto end_unlock;
+               } else {
+                       uint64_t consumed_pos;
+
+                       ret = lttng_consumer_get_produced_snapshot(stream,
+                                       &stream->rotate_position);
+                       if (ret < 0) {
+                               ERR("Produced kernel snapshot position");
+                               goto end_unlock;
+                       }
+                       fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
+                                       stream->key, stream->rotate_position,
+                                       channel->pathname);
+                       lttng_consumer_get_consumed_snapshot(stream,
+                                       &consumed_pos);
+                       fprintf(stderr, "consumed %lu\n", consumed_pos);
+                       if (consumed_pos == stream->rotate_position) {
+                               stream->rotate_ready = 1;
+                               fprintf(stderr, "Stream %lu ready to rotate to %s\n",
+                                               stream->key, channel->pathname);
+                       }
+               }
+               channel->nr_stream_rotate_pending++;
+
+               ret = consumer_flush_buffer(stream, 1);
+               if (ret < 0) {
+                       ERR("Failed to flush stream");
+                       goto end_unlock;
+               }
+
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       ret = 0;
+       goto end_unlock_channel;
+
+end_unlock:
+       pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+       pthread_mutex_unlock(&channel->lock);
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream and channel locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       unsigned long consumed_pos;
+
+       if (!stream->rotate_position && !stream->rotate_ready) {
+               ret = 0;
+               goto end;
+       }
+
+       /*
+        * If we don't have the rotate_ready flag, check the consumed position
+        * to determine if we need to rotate.
+        */
+       if (!stream->rotate_ready) {
+               ret = lttng_consumer_sample_snapshot_positions(stream);
+               if (ret < 0) {
+                       ERR("Taking kernel snapshot positions");
+                       goto error;
+               }
+
+               ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+               if (ret < 0) {
+                       ERR("Produced kernel snapshot position");
+                       goto error;
+               }
+
+               fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
+               /* Rotate position not reached yet. */
+               if (consumed_pos < stream->rotate_position) {
+                       ret = 0;
+                       goto end;
+               }
+               fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
+                               consumed_pos, stream->rotate_position, stream->key);
+       } else {
+               fprintf(stderr, "Rotate position reached for stream %lu\n",
+                               stream->key);
+       }
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               goto error;
+       }
+
+       fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key,
+                       stream->channel_ro_pathname, stream->name);
+       ret = utils_create_stream_file(stream->channel_ro_pathname, stream->name,
+                       stream->channel_ro_tracefile_size, stream->tracefile_count_current,
+                       stream->uid, stream->gid, NULL);
+       if (ret < 0) {
+               goto error;
+       }
+       stream->out_fd = ret;
+       stream->tracefile_size_current = 0;
+
+       if (!stream->metadata_flag) {
+               struct lttng_index_file *index_file;
+
+               lttng_index_file_put(stream->index_file);
+
+               index_file = lttng_index_file_create(stream->channel_ro_pathname,
+                               stream->name, stream->uid, stream->gid,
+                               stream->channel_ro_tracefile_size,
+                               stream->tracefile_count_current,
+                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+               if (!index_file) {
+                       goto error;
+               }
+               stream->index_file = index_file;
+               stream->out_fd_offset = 0;
+       } else {
+               switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Reset the position of what has been read from the metadata
+                        * cache to 0 so we can dump it again.
+                        */
+                       ret = kernctl_metadata_cache_dump(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to dump the kernel metadata cache after rotation");
+                               goto error;
+                       }
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Reset the position pushed from the metadata cache so it
+                        * will write from the beginning on the next push.
+                        */
+                       stream->ust_metadata_pushed = 0;
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+               }
+       }
+
+       stream->rotate_position = 0;
+       stream->rotate_ready = 0;
+       stream->rotated = 1;
+
+       ret = 0;
+       goto end;
+
+error:
+       ret = -1;
+end:
+       return ret;
+}
+
+/*
+ * Rotate all the ready streams.
+ *
+ * This is especially important for low throughput streams that have already
+ * been consumed, we cannot wait for their next packet to perform the
+ * rotation.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_ready_streams(uint64_t key,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("No channel found for key %" PRIu64, key);
+               ret = -1;
+               goto end;
+       }
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               health_code_update();
+
+               /*
+                * Lock stream because we are about to change its state.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (stream->rotate_ready == 0) {
+                       pthread_mutex_unlock(&stream->lock);
+                       continue;
+               }
+               ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (ret < 0) {
+                       pthread_mutex_unlock(&stream->lock);
+                       ERR("Stream rotation error");
+                       goto end;
+               }
+
+               pthread_mutex_unlock(&stream->lock);
+               ret = consumer_post_rotation(stream, ctx);
+               if (ret < 0) {
+                       ERR("Failed after a rotation");
+                       goto end;
+               }
+       }
+
+       ret = 0;
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.037562 seconds and 5 git commands to generate.