/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
lttng_consumer_sync_trace_file(stream, orig_offset);
end:
+ pthread_mutex_unlock(&stream->lock);
/* Unlock only if ctrl socket used */
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
}
end:
+ pthread_mutex_unlock(&stream->lock);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
+ struct consumer_relayd_sock_pair *relayd;
int (*data_available)(struct lttng_consumer_stream *);
DBG("Consumer data available command on session id %" PRIu64, id);
assert(0);
}
+ rcu_read_lock();
+
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct((void *)((unsigned long) id), 0x42UL),
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* Check the stream for data. */
if (ret == 0) {
goto data_not_available;
}
- }
- /* TODO: Support to ask the relayd if the streams are remote */
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ assert(relayd);
+
+ pthread_mutex_lock(&stream->lock);
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (stream->metadata_flag) {
+ ret = relayd_quiescent_control(&relayd->control_sock);
+ } else {
+ ret = relayd_data_available(&relayd->control_sock,
+ stream->relayd_stream_id, stream->next_net_seq_num);
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret == 0) {
+ goto data_not_available;
+ }
+ }
+ }
/*
* Finding _no_ node in the hash table means that the stream(s) have been
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
return 1;
data_not_available:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
return 0;
}