goto free_stream;
}
- pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
end:
consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
goto free_stream;
}
+ pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
- pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
}
end:
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
assert(stream);
assert(ht);
*/
rcu_read_lock();
+
+ /*
+ * Lookup the stream just to make sure it does not exist in our internal
+ * state. This should NEVER happen.
+ */
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ assert(!node);
+
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
uatomic_dec(&stream->chan->nb_init_streams);
}
- /* Steal stream identifier to avoid having streams with the same key */
- consumer_steal_stream_key(stream->key, ht);
-
lttng_ht_add_unique_ulong(ht, &stream->node);
/*
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
+ ssize_t ret;
+
+ pthread_mutex_lock(&stream->lock);
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+ break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+ break;
default:
ERR("Unknown consumer_data type");
assert(0);
- return -ENOSYS;
+ ret = -ENOSYS;
+ break;
}
+
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
}
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)