lttng_ht_node_init_u64(&index->index_n, net_seq_num);
pthread_mutex_init(&index->lock, NULL);
- pthread_mutex_init(&index->reflock, NULL);
urcu_ref_init(&index->ref);
end:
*/
static bool relay_index_get(struct relay_index *index)
{
- bool has_ref = false;
-
DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
index->stream->stream_handle, index->index_n.key,
(int) index->ref.refcount);
- /* Confirm that the index refcount has not reached 0. */
- pthread_mutex_lock(&index->reflock);
- if (index->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&index->ref);
- }
- pthread_mutex_unlock(&index->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&index->ref);
}
/*
end:
rcu_read_unlock();
DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
- (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
+ (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
return index;
}
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+int relay_index_set_file(struct relay_index *index,
+ struct lttng_index_file *index_file,
uint64_t data_offset)
{
int ret = 0;
pthread_mutex_lock(&index->lock);
- if (index->index_fd) {
+ if (index->index_file) {
ret = -1;
goto end;
}
- stream_fd_get(index_fd);
- index->index_fd = index_fd;
+ lttng_index_file_get(index_file);
+ index->index_file = index_file;
index->index_data.offset = data_offset;
end:
pthread_mutex_unlock(&index->lock);
int ret;
struct lttng_ht_iter iter;
- if (index->index_fd) {
- stream_fd_put(index->index_fd);
- index->index_fd = NULL;
+ if (index->index_file) {
+ lttng_index_file_put(index->index_file);
+ index->index_file = NULL;
}
if (index->in_hash_table) {
/* Delete index from hash table. */
* Index lock ensures that concurrent test and update of stream
* ref is atomic.
*/
- pthread_mutex_lock(&index->reflock);
assert(index->ref.refcount != 0);
urcu_ref_put(&index->ref, index_release);
- pthread_mutex_unlock(&index->reflock);
rcu_read_unlock();
}
goto skip;
}
/* Check if we are ready to flush. */
- if (!index->has_index_data || !index->index_fd) {
+ if (!index->has_index_data || !index->index_file) {
goto skip;
}
- fd = index->index_fd->fd;
+ fd = index->index_file->fd;
DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
" on fd %d", index->stream->stream_handle,
index->index_n.key, fd);
flushed = true;
index->flushed = true;
- ret = index_write(fd, &index->index_data, sizeof(index->index_data));
- if (ret == sizeof(index->index_data)) {
- ret = 0;
- } else {
- ret = -1;
- }
+ ret = lttng_index_file_write(index->index_file, &index->index_data);
skip:
pthread_mutex_unlock(&index->lock);
rcu_read_lock();
cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
index, index_n.node) {
- if (!index->index_fd) {
+ if (!index->index_file) {
continue;
}
/*
- * Partial index has its index_fd: we have only
+ * Partial index has its index_file: we have only
* received its info from the data socket.
* Put self-ref from index.
*/