fix
[lttng-tools.git] / src / common / consumer / consumer.c
index a8c369904ec35688a3467ab383c9c8f05cc23e7e..193b9ea1b1b5ae56ae4c5d918b7534d6e12d7f0a 100644 (file)
@@ -541,6 +541,13 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
        consumer_stream_destroy(stream, metadata_ht);
 }
 
+void consumer_stream_copy_ro_channel_values(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_channel *channel)
+{
+       stream->channel_ro_tracefile_size = channel->tracefile_size;
+       memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -1958,6 +1965,25 @@ end:
        return written;
 }
 
+/*
+ * Sample the snapshot positions for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_sample_snapshot_positions(stream);
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               return lttng_ustconsumer_sample_snapshot_positions(stream);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
 /*
  * Take a snapshot for a specific fd
  *
@@ -1999,6 +2025,27 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
        }
 }
 
+/*
+ * Get the consumed position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+               unsigned long *pos)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_get_consumed_snapshot(stream, pos);
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
@@ -3265,10 +3312,79 @@ error_testpoint:
        return NULL;
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       int ret;
+
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("write to the channel rotate pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+       }
+
+       return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->rotated) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&stream->chan->lock);
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               /*
+                * The ust_metadata_pushed counter has been reset to 0, so now
+                * we can wakeup the metadata thread so it dumps the metadata
+                * cache to the new file.
+                */
+               if (stream->metadata_flag) {
+                       consumer_metadata_wakeup_pipe(stream->chan);
+               }
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       pthread_mutex_unlock(&stream->chan->lock);
+       stream->rotated = 0;
+
+end:
+       return ret;
+}
+
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
+       int rotate_ret;
 
        pthread_mutex_lock(&stream->lock);
        if (stream->metadata_flag) {
@@ -3295,6 +3411,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
        pthread_mutex_unlock(&stream->lock);
+
+       rotate_ret = consumer_post_rotation(stream, ctx);
+       if (rotate_ret < 0) {
+               ERR("Failed after a rotation");
+               ret = -1;
+       }
+
        return ret;
 }
 
@@ -3779,3 +3902,297 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        }
        return start_pos;
 }
+
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_flush(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustctl_flush_buffer(stream, producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Sample the rotate position for all the streams of a channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_channel(uint64_t key, char *path,
+               uint64_t relayd_id, uint32_t metadata,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       DBG("Kernel consumer sample rotate position for channel %" PRIu64, key);
+
+       rcu_read_lock();
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("No channel found for key %" PRIu64, key);
+               ret = -1;
+               goto end;
+       }
+       pthread_mutex_lock(&channel->lock);
+       snprintf(channel->pathname, PATH_MAX, "%s", path);
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               health_code_update();
+
+               /*
+                * Lock stream because we are about to change its state.
+                */
+               pthread_mutex_lock(&stream->lock);
+               memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
+               ret = lttng_consumer_sample_snapshot_positions(stream);
+               if (ret < 0) {
+                       ERR("Taking kernel snapshot positions");
+                       goto end_unlock;
+               } else {
+                       uint64_t consumed_pos;
+
+                       ret = lttng_consumer_get_produced_snapshot(stream,
+                                       &stream->rotate_position);
+                       if (ret < 0) {
+                               ERR("Produced kernel snapshot position");
+                               goto end_unlock;
+                       }
+                       fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
+                                       stream->key, stream->rotate_position,
+                                       channel->pathname);
+                       lttng_consumer_get_consumed_snapshot(stream,
+                                       &consumed_pos);
+                       fprintf(stderr, "consumed %lu\n", consumed_pos);
+                       if (consumed_pos == stream->rotate_position) {
+                               stream->rotate_ready = 1;
+                               fprintf(stderr, "Stream %lu ready to rotate to %s\n",
+                                               stream->key, channel->pathname);
+                       }
+               }
+               channel->nr_stream_rotate_pending++;
+
+               ret = consumer_flush_buffer(stream, 1);
+               if (ret < 0) {
+                       ERR("Failed to flush stream");
+                       goto end_unlock;
+               }
+
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       ret = 0;
+       goto end_unlock_channel;
+
+end_unlock:
+       pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+       pthread_mutex_unlock(&channel->lock);
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream and channel locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       unsigned long consumed_pos;
+
+       if (!stream->rotate_position && !stream->rotate_ready) {
+               ret = 0;
+               goto end;
+       }
+
+       /*
+        * If we don't have the rotate_ready flag, check the consumed position
+        * to determine if we need to rotate.
+        */
+       if (!stream->rotate_ready) {
+               ret = lttng_consumer_sample_snapshot_positions(stream);
+               if (ret < 0) {
+                       ERR("Taking kernel snapshot positions");
+                       goto error;
+               }
+
+               ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+               if (ret < 0) {
+                       ERR("Produced kernel snapshot position");
+                       goto error;
+               }
+
+               fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
+               /* Rotate position not reached yet. */
+               if (consumed_pos < stream->rotate_position) {
+                       ret = 0;
+                       goto end;
+               }
+               fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
+                               consumed_pos, stream->rotate_position, stream->key);
+       } else {
+               fprintf(stderr, "Rotate position reached for stream %lu\n",
+                               stream->key);
+       }
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               goto error;
+       }
+
+       fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key,
+                       stream->channel_ro_pathname, stream->name);
+       ret = utils_create_stream_file(stream->channel_ro_pathname, stream->name,
+                       stream->channel_ro_tracefile_size, stream->tracefile_count_current,
+                       stream->uid, stream->gid, NULL);
+       if (ret < 0) {
+               goto error;
+       }
+       stream->out_fd = ret;
+       stream->tracefile_size_current = 0;
+
+       if (!stream->metadata_flag) {
+               struct lttng_index_file *index_file;
+
+               lttng_index_file_put(stream->index_file);
+
+               index_file = lttng_index_file_create(stream->channel_ro_pathname,
+                               stream->name, stream->uid, stream->gid,
+                               stream->channel_ro_tracefile_size,
+                               stream->tracefile_count_current,
+                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+               if (!index_file) {
+                       goto error;
+               }
+               stream->index_file = index_file;
+               stream->out_fd_offset = 0;
+       } else {
+               switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Reset the position of what has been read from the metadata
+                        * cache to 0 so we can dump it again.
+                        */
+                       ret = kernctl_metadata_cache_dump(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to dump the kernel metadata cache after rotation");
+                               goto error;
+                       }
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Reset the position pushed from the metadata cache so it
+                        * will write from the beginning on the next push.
+                        */
+                       stream->ust_metadata_pushed = 0;
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+               }
+       }
+
+       stream->rotate_position = 0;
+       stream->rotate_ready = 0;
+       stream->rotated = 1;
+
+       ret = 0;
+       goto end;
+
+error:
+       ret = -1;
+end:
+       return ret;
+}
+
+/*
+ * Rotate all the ready streams.
+ *
+ * This is especially important for low throughput streams that have already
+ * been consumed, we cannot wait for their next packet to perform the
+ * rotation.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_ready_streams(uint64_t key,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("No channel found for key %" PRIu64, key);
+               ret = -1;
+               goto end;
+       }
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               health_code_update();
+
+               /*
+                * Lock stream because we are about to change its state.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (stream->rotate_ready == 0) {
+                       pthread_mutex_unlock(&stream->lock);
+                       continue;
+               }
+               ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (ret < 0) {
+                       pthread_mutex_unlock(&stream->lock);
+                       ERR("Stream rotation error");
+                       goto end;
+               }
+
+               pthread_mutex_unlock(&stream->lock);
+               ret = consumer_post_rotation(stream, ctx);
+               if (ret < 0) {
+                       ERR("Failed after a rotation");
+                       goto end;
+               }
+       }
+
+       ret = 0;
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.033239 seconds and 5 git commands to generate.