Perform local data pending check then relayd
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 23 Jul 2018 03:38:34 +0000 (23:38 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 21 Sep 2018 04:42:01 +0000 (00:42 -0400)
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
src/common/consumer/consumer.c

index 7b87a85fe600a21503177de42842b4a1e878bf3e..482d4a63bd00a98c75ecb681390e4c211f69b62b 100644 (file)
@@ -3604,21 +3604,6 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       relayd = find_relayd_by_session_id(id);
-       if (relayd) {
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_begin_data_pending(&relayd->control_sock,
-                               relayd->relayd_session_id);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       /* Communication error thus the relayd so no data pending. */
-                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-                       lttng_consumer_cleanup_relayd(relayd);
-                       goto data_not_pending;
-               }
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&id, lttng_ht_seed),
                        ht->match_fct, &id,
@@ -3641,9 +3626,29 @@ int consumer_data_pending(uint64_t id)
                        }
                }
 
-               /* Relayd check */
-               if (relayd) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               if (ret < 0) {
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       /* Communication error thus the relayd so no data pending. */
+                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto data_not_pending;
+               }
+
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                stream->relayd_stream_id);
@@ -3652,27 +3657,19 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
-                       if (ret < 0) {
-                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-                               lttng_consumer_cleanup_relayd(relayd);
+                       if (ret == 1) {
                                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
+                               goto data_pending;
                        }
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       if (ret == 1) {
+                       if (ret < 0) {
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_pending;
+                               goto data_not_pending;
                        }
                }
-               pthread_mutex_unlock(&stream->lock);
-       }
-
-       if (relayd) {
-               unsigned int is_data_inflight = 0;
 
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               /* Send end command for data pending. */
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
This page took 0.028594 seconds and 5 git commands to generate.