* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *find_stream(int key,
+static struct lttng_consumer_stream *find_stream(uint64_t key,
struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_consumer_stream *stream = NULL;
assert(ht);
- /* Negative keys are lookup failures */
- if (key < 0) {
+ /* -1ULL keys are lookup failures */
+ if (key == (uint64_t) -1ULL) {
return NULL;
}
rcu_read_lock();
- lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(ht, &key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
rcu_read_lock();
stream = find_stream(key, ht);
if (stream) {
- stream->key = -1;
+ stream->key = -1ULL;
/*
* We don't want the lookup to match, but we still need
* to iterate on this stream when iterating over the hash table. Just
* change the node key.
*/
- stream->node.key = -1;
+ stream->node.key = -1ULL;
}
rcu_read_unlock();
}
* RCU read side lock MUST be acquired before calling this function and
* protects the channel ptr.
*/
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_consumer_channel *channel = NULL;
- /* Negative keys are lookup failures */
- if (key < 0) {
+ /* -1ULL keys are lookup failures */
+ if (key == (uint64_t) -1ULL) {
return NULL;
}
- lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
static void free_stream_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_ht_node_u64 *node =
+ caa_container_of(head, struct lttng_ht_node_u64, head);
struct lttng_consumer_stream *stream =
caa_container_of(node, struct lttng_consumer_stream, node);
static void free_channel_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_ht_node_u64 *node =
+ caa_container_of(head, struct lttng_ht_node_u64, head);
struct lttng_consumer_channel *channel =
caa_container_of(node, struct lttng_consumer_channel, node);
*/
static void free_relayd_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_ht_node_u64 *node =
+ caa_container_of(head, struct lttng_ht_node_u64, head);
struct consumer_relayd_sock_pair *relayd =
caa_container_of(node, struct consumer_relayd_sock_pair, node);
int ret;
struct lttng_ht_iter iter;
- DBG("Consumer delete channel key %d", channel->key);
+ DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
call_rcu(&stream->node.head, free_stream_rcu);
}
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
- int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+ uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
uid_t uid,
}
/* Key is always the wait_fd for streams. */
- lttng_ht_node_init_ulong(&stream->node, stream->key);
+ lttng_ht_node_init_u64(&stream->node, stream->key);
/* Init session id node with the stream session id */
- lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+ lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
- DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
+ DBG3("Allocated stream %s (key %" PRIu64 ", relayd_id %" PRIu64 ", session_id %" PRIu64,
stream->name, stream->key, stream->net_seq_idx, stream->session_id);
rcu_read_unlock();
assert(stream);
assert(ht);
- DBG3("Adding consumer stream %d", stream->key);
+ DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
- lttng_ht_add_unique_ulong(ht, &stream->node);
+ lttng_ht_add_unique_u64(ht, &stream->node);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
assert(relayd);
lttng_ht_lookup(consumer_data.relayd_ht,
- (void *)((unsigned long) relayd->net_seq_idx), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ &relayd->net_seq_idx, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
goto end;
}
- lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
+ lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
end:
return ret;
obj->net_seq_idx = net_seq_idx;
obj->refcount = 0;
obj->destroy_flag = 0;
- lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
+ lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
error:
* RCU read-side lock must be held across this call and while using the
* returned object.
*/
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct consumer_relayd_sock_pair *relayd = NULL;
/* Negative keys are lookup failures */
- if (key < 0) {
+ if (key == (uint64_t) -1ULL) {
goto error;
}
- lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
+ lttng_ht_lookup(consumer_data.relayd_ht, &key,
&iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
}
strncpy(channel->name, name, sizeof(channel->name));
channel->name[sizeof(channel->name) - 1] = '\0';
- lttng_ht_node_init_ulong(&channel->node, channel->key);
+ lttng_ht_node_init_u64(&channel->node, channel->key);
CDS_INIT_LIST_HEAD(&channel->streams.head);
- DBG("Allocated channel (key %d)", channel->key)
+ DBG("Allocated channel (key %" PRIu64 ")", channel->key)
end:
return channel;
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
int ret = 0;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht,
- (void *)((unsigned long) channel->key), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ &channel->key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
/* Channel already exist. Ignore the insertion */
- ERR("Consumer add channel key %d already exists!", channel->key);
+ ERR("Consumer add channel key %" PRIu64 " already exists!",
+ channel->key);
ret = -1;
goto end;
}
- lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+ lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
end:
rcu_read_unlock();
lttng_ht_destroy(ht);
}
+void lttng_consumer_close_metadata(void)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * The Kernel consumer has a different metadata scheme so we don't
+ * close anything because the stream will be closed by the session
+ * daemon.
+ */
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Close all metadata streams. The metadata hash table is passed and
+ * this call iterates over it by closing all wakeup fd. This is safe
+ * because at this point we are sure that the metadata producer is
+ * either dead or blocked.
+ */
+ lttng_ustconsumer_close_metadata(metadata_ht);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+}
+
/*
* Clean up a metadata stream and free its memory.
*/
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
assert(stream);
assert(ht);
- DBG3("Adding metadata stream %d to hash table", stream->key);
+ DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
* Lookup the stream just to make sure it does not exist in our internal
* state. This should NEVER happen.
*/
- lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(ht, &stream->key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
assert(!node);
/* Find relayd and, if one is found, increment refcount. */
uatomic_dec(&stream->chan->nb_init_stream_left);
}
- lttng_ht_add_unique_ulong(ht, &stream->node);
+ lttng_ht_add_unique_u64(ht, &stream->node);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
rcu_read_unlock();
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_poll_event events;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
rcu_register_thread();
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
goto error;
}
rcu_read_lock();
- lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
- &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ {
+ uint64_t tmp_id = (uint64_t) pollfd;
+
+ lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
+ }
+ node = lttng_ht_iter_get_node_u64(&iter);
assert(node);
stream = caa_container_of(node, struct lttng_consumer_stream,
rcu_register_thread();
- data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
goto end;
ret = add_stream(new_stream, data_ht);
if (ret) {
- ERR("Consumer add stream %d failed. Continuing",
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
new_stream->key);
/*
* At this point, if the add_stream fails, it is not in the
end:
DBG("Consumer thread sessiond poll exiting");
+ /*
+ * Close metadata streams since the producer is the session daemon which
+ * just died.
+ *
+ * NOTE: for now, this only applies to the UST tracer.
+ */
+ lttng_consumer_close_metadata();
+
/*
* when all fds have hung up, the polling thread
* can exit cleanly
*/
void lttng_consumer_init(void)
{
- consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
}
/*
goto error;
}
- DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+ DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
}
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
- ht->match_fct, (void *)((unsigned long) id),
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);