/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
* David Goulet <dgoulet@efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#include <common/utils.h>
#include "lttng-relayd.h"
+#include "stream.h"
#include "index.h"
/*
- * Deferred free of a relay index object. MUST only be called by a call RCU.
+ * Allocate a new relay index object. Pass the stream in which it is
+ * contained as parameter. The sequence number will be used as the hash
+ * table key.
+ *
+ * Called with stream mutex held.
+ * Return allocated object or else NULL on error.
*/
-static void deferred_free_relay_index(struct rcu_head *head)
+static struct relay_index *relay_index_create(struct relay_stream *stream,
+ uint64_t net_seq_num)
{
- struct relay_index *index =
- caa_container_of(head, struct relay_index, rcu_node);
+ struct relay_index *index;
- if (index->to_close_fd >= 0) {
- int ret;
+ DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
+ stream->stream_handle, net_seq_num);
- ret = close(index->to_close_fd);
- if (ret < 0) {
- PERROR("Relay index to close fd %d", index->to_close_fd);
- }
+ index = zmalloc(sizeof(*index));
+ if (!index) {
+ PERROR("Relay index zmalloc");
+ goto end;
+ }
+ if (!stream_get(stream)) {
+ ERR("Cannot get stream");
+ free(index);
+ index = NULL;
+ goto end;
}
+ index->stream = stream;
- relay_index_free(index);
+ lttng_ht_node_init_u64(&index->index_n, net_seq_num);
+ pthread_mutex_init(&index->lock, NULL);
+ pthread_mutex_init(&index->reflock, NULL);
+ urcu_ref_init(&index->ref);
+
+end:
+ return index;
}
/*
- * Allocate a new relay index object using the given stream ID and sequence
- * number as the hash table key.
+ * Add unique relay index to the given hash table. In case of a collision, the
+ * already existing object is put in the given _index variable.
*
- * Return allocated object or else NULL on error.
+ * RCU read side lock MUST be acquired.
*/
-struct relay_index *relay_index_create(uint64_t stream_id,
- uint64_t net_seq_num)
+static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
+ struct relay_index *index)
{
- struct relay_index *index;
+ struct cds_lfht_node *node_ptr;
+ struct relay_index *_index;
- DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- stream_id, net_seq_num);
+ DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+ stream->stream_handle, index->index_n.key);
- index = zmalloc(sizeof(*index));
- if (index == NULL) {
- PERROR("Relay index zmalloc");
- goto error;
+ node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
+ stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+ stream->indexes_ht->match_fct, &index->index_n,
+ &index->index_n.node);
+ if (node_ptr != &index->index_n.node) {
+ _index = caa_container_of(node_ptr, struct relay_index,
+ index_n.node);
+ } else {
+ _index = NULL;
}
+ return _index;
+}
+
+/*
+ * Should be called with RCU read-side lock held.
+ */
+static bool relay_index_get(struct relay_index *index)
+{
+ bool has_ref = false;
- index->to_close_fd = -1;
- lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
+ DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+ index->stream->stream_handle, index->index_n.key,
+ (int) index->ref.refcount);
-error:
- return index;
+ /* Confirm that the index refcount has not reached 0. */
+ pthread_mutex_lock(&index->reflock);
+ if (index->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&index->ref);
+ }
+ pthread_mutex_unlock(&index->reflock);
+
+ return has_ref;
}
/*
- * Find a relayd index in the given hash table.
+ * Get a relayd index in within the given stream, or create it if not
+ * present.
*
+ * Called with stream mutex held.
* Return index object or else NULL on error.
*/
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
+ uint64_t net_seq_num)
{
- struct lttng_ht_node_two_u64 *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- struct lttng_ht_two_u64 key;
struct relay_index *index = NULL;
DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
- stream_id, net_seq_num);
-
- key.key1 = stream_id;
- key.key2 = net_seq_num;
+ stream->stream_handle, net_seq_num);
- lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
- node = lttng_ht_iter_get_node_two_u64(&iter);
- if (node == NULL) {
- goto end;
+ rcu_read_lock();
+ lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node) {
+ index = caa_container_of(node, struct relay_index, index_n);
+ } else {
+ struct relay_index *oldindex;
+
+ index = relay_index_create(stream, net_seq_num);
+ if (!index) {
+ ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
+ index->stream->stream_handle, net_seq_num);
+ goto end;
+ }
+ oldindex = relay_index_add_unique(stream, index);
+ if (oldindex) {
+ /* Added concurrently, keep old. */
+ relay_index_put(index);
+ index = oldindex;
+ if (!relay_index_get(index)) {
+ index = NULL;
+ }
+ } else {
+ stream->indexes_in_flight++;
+ index->in_hash_table = true;
+ }
}
- index = caa_container_of(node, struct relay_index, index_n);
-
end:
- DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
- (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
+ rcu_read_unlock();
+ DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
+ (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
return index;
}
-/*
- * Add unique relay index to the given hash table. In case of a collision, the
- * already existing object is put in the given _index variable.
- *
- * RCU read side lock MUST be acquired.
- */
-void relay_index_add(struct relay_index *index, struct relay_index **_index)
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+ uint64_t data_offset)
{
- struct cds_lfht_node *node_ptr;
+ int ret = 0;
- assert(index);
+ pthread_mutex_lock(&index->lock);
+ if (index->index_fd) {
+ ret = -1;
+ goto end;
+ }
+ stream_fd_get(index_fd);
+ index->index_fd = index_fd;
+ index->index_data.offset = data_offset;
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
+}
- DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- index->index_n.key.key1, index->index_n.key.key2);
+int relay_index_set_data(struct relay_index *index,
+ const struct ctf_packet_index *data)
+{
+ int ret = 0;
- node_ptr = cds_lfht_add_unique(indexes_ht->ht,
- indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
- indexes_ht->match_fct, (void *) &index->index_n.key,
- &index->index_n.node);
- if (node_ptr != &index->index_n.node) {
- *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
+ pthread_mutex_lock(&index->lock);
+ if (index->has_index_data) {
+ ret = -1;
+ goto end;
}
+ /* Set everything except data_offset. */
+ index->index_data.packet_size = data->packet_size;
+ index->index_data.content_size = data->content_size;
+ index->index_data.timestamp_begin = data->timestamp_begin;
+ index->index_data.timestamp_end = data->timestamp_end;
+ index->index_data.events_discarded = data->events_discarded;
+ index->index_data.stream_id = data->stream_id;
+ index->has_index_data = true;
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
}
-/*
- * Write index on disk to the given fd. Once done error or not, it is removed
- * from the hash table and destroy the object.
- *
- * MUST be called with a RCU read side lock held.
- *
- * Return 0 on success else a negative value.
- */
-int relay_index_write(int fd, struct relay_index *index)
+static void index_destroy(struct relay_index *index)
{
- int ret;
- struct lttng_ht_iter iter;
-
- DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
- " on fd %d", index->index_n.key.key1,
- index->index_n.key.key2, fd);
+ free(index);
+}
- /* Delete index from hash table. */
- iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(indexes_ht, &iter);
- assert(!ret);
- call_rcu(&index->rcu_node, deferred_free_relay_index);
+static void index_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_index *index =
+ caa_container_of(rcu_head, struct relay_index, rcu_node);
- return index_write(fd, &index->index_data, sizeof(index->index_data));
+ index_destroy(index);
}
-/*
- * Free the given index.
- */
-void relay_index_free(struct relay_index *index)
+/* Stream lock must be held by the caller. */
+static void index_release(struct urcu_ref *ref)
{
- free(index);
+ struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+ struct relay_stream *stream = index->stream;
+ int ret;
+ struct lttng_ht_iter iter;
+
+ if (index->index_fd) {
+ stream_fd_put(index->index_fd);
+ index->index_fd = NULL;
+ }
+ if (index->in_hash_table) {
+ /* Delete index from hash table. */
+ iter.iter.node = &index->index_n.node;
+ ret = lttng_ht_del(stream->indexes_ht, &iter);
+ assert(!ret);
+ stream->indexes_in_flight--;
+ }
+
+ stream_put(index->stream);
+ index->stream = NULL;
+
+ call_rcu(&index->rcu_node, index_destroy_rcu);
}
/*
- * Safely free the given index using a call RCU.
+ * Called with stream mutex held.
+ *
+ * Stream lock must be held by the caller.
*/
-void relay_index_free_safe(struct relay_index *index)
+void relay_index_put(struct relay_index *index)
{
- if (!index) {
- return;
- }
-
- call_rcu(&index->rcu_node, deferred_free_relay_index);
+ DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+ index->stream->stream_handle, index->index_n.key,
+ (int) index->ref.refcount);
+ /*
+ * Ensure existance of index->lock for index unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Index lock ensures that concurrent test and update of stream
+ * ref is atomic.
+ */
+ pthread_mutex_lock(&index->reflock);
+ assert(index->ref.refcount != 0);
+ urcu_ref_put(&index->ref, index_release);
+ pthread_mutex_unlock(&index->reflock);
+ rcu_read_unlock();
}
/*
- * Delete index from the given hash table.
+ * Try to flush index to disk. Releases self-reference to index once
+ * flush succeeds.
*
- * RCU read side lock MUST be acquired.
+ * Stream lock must be held by the caller.
+ * Return 0 on successful flush, a negative value on error, or positive
+ * value if no flush was performed.
*/
-void relay_index_delete(struct relay_index *index)
+int relay_index_try_flush(struct relay_index *index)
{
- int ret;
- struct lttng_ht_iter iter;
+ int ret = 1;
+ bool flushed = false;
+ int fd;
- DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
- " deleted.", index->index_n.key.key1,
- index->index_n.key.key2);
+ pthread_mutex_lock(&index->lock);
+ if (index->flushed) {
+ goto skip;
+ }
+ /* Check if we are ready to flush. */
+ if (!index->has_index_data || !index->index_fd) {
+ goto skip;
+ }
+ fd = index->index_fd->fd;
+ DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
+ " on fd %d", index->stream->stream_handle,
+ index->index_n.key, fd);
+ flushed = true;
+ index->flushed = true;
+ ret = index_write(fd, &index->index_data, sizeof(index->index_data));
+ if (ret == sizeof(index->index_data)) {
+ ret = 0;
+ } else {
+ ret = -1;
+ }
+skip:
+ pthread_mutex_unlock(&index->lock);
- /* Delete index from hash table. */
- iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(indexes_ht, &iter);
- assert(!ret);
+ if (flushed) {
+ /* Put self-ref from index now that it has been flushed. */
+ relay_index_put(index);
+ }
+ return ret;
}
/*
- * Destroy every relay index with the given stream id as part of the key.
+ * Close every relay index within a given stream, without flushing
+ * them.
*/
-void relay_index_destroy_by_stream_id(uint64_t stream_id)
+void relay_index_close_all(struct relay_stream *stream)
{
struct lttng_ht_iter iter;
struct relay_index *index;
rcu_read_lock();
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- if (index->index_n.key.key1 == stream_id) {
- relay_index_delete(index);
- relay_index_free_safe(index);
- }
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ /* Put self-ref from index. */
+ relay_index_put(index);
}
rcu_read_unlock();
}