#include <common/common.h>
#include <common/utils.h>
+#include <common/compat/endian.h>
#include "lttng-relayd.h"
#include "stream.h"
lttng_ht_node_init_u64(&index->index_n, net_seq_num);
pthread_mutex_init(&index->lock, NULL);
- pthread_mutex_init(&index->reflock, NULL);
urcu_ref_init(&index->ref);
end:
*/
static bool relay_index_get(struct relay_index *index)
{
- bool has_ref = false;
-
DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
index->stream->stream_handle, index->index_n.key,
(int) index->ref.refcount);
- /* Confirm that the index refcount has not reached 0. */
- pthread_mutex_lock(&index->reflock);
- if (index->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&index->ref);
- }
- pthread_mutex_unlock(&index->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&index->ref);
}
/*
* Index lock ensures that concurrent test and update of stream
* ref is atomic.
*/
- pthread_mutex_lock(&index->reflock);
assert(index->ref.refcount != 0);
urcu_ref_put(&index->ref, index_release);
- pthread_mutex_unlock(&index->reflock);
rcu_read_unlock();
}
rcu_read_unlock();
return net_seq_num;
}
+
+/*
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
+ */
+static
+int relay_index_switch_file(struct relay_index *index,
+ struct lttng_index_file *new_index_file,
+ uint64_t removed_data_count)
+{
+ int ret = 0;
+ uint64_t offset;
+
+ pthread_mutex_lock(&index->lock);
+ if (!index->index_file) {
+ ERR("No index_file");
+ ret = 0;
+ goto end;
+ }
+
+ lttng_index_file_put(index->index_file);
+ lttng_index_file_get(new_index_file);
+ index->index_file = new_index_file;
+ offset = be64toh(index->index_data.offset);
+ index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+ int ret = 0;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ DBG("Update index to fd %d", stream->index_file->fd);
+ ret = relay_index_switch_file(index, stream->index_file,
+ stream->pos_after_last_complete_data_index);
+ if (ret) {
+ goto end;
+ }
+ }
+end:
+ rcu_read_unlock();
+ return ret;
+}