Cleanup: missing line in consumer-stream.c
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index 522b3cd5cd8ac0c45876c12dd7e344c8c7c27118..97ba627dcc316a3ff95b7fa9a8182d8b8fe74473 100644 (file)
@@ -71,22 +71,18 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
        ret = relayd_send_close_stream(&relayd->control_sock,
                        stream->relayd_stream_id,
                        stream->next_net_seq_num - 1);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        if (ret < 0) {
-               DBG("Unable to close stream on the relayd. Continuing");
-               /*
-                * Continue here. There is nothing we can do for the relayd.
-                * Chances are that the relayd has closed the socket so we just
-                * continue cleaning up.
-                */
+               ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->id);
+               lttng_consumer_cleanup_relayd(relayd);
        }
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
        /* Both conditions are met, we destroy the relayd. */
        if (uatomic_read(&relayd->refcount) == 0 &&
                        uatomic_read(&relayd->destroy_flag)) {
                consumer_destroy_relayd(relayd);
        }
-       stream->net_seq_idx = (uint64_t) -1ULL;
+       stream->relayd_id = (uint64_t) -1ULL;
        stream->sent_to_relayd = 0;
 }
 
@@ -170,7 +166,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
 
        /* Check and cleanup relayd if needed. */
        rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
+       relayd = consumer_find_relayd(stream->relayd_id);
        if (relayd != NULL) {
                consumer_stream_relayd_close(stream, relayd);
        }
@@ -359,18 +355,34 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
                struct ctf_packet_index *element)
 {
        int ret;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        assert(element);
 
        rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd) {
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_index(&relayd->control_sock, element,
+       if (stream->relayd_id != (uint64_t) -1ULL) {
+               struct consumer_relayd_sock_pair *relayd;
+
+               relayd = consumer_find_relayd(stream->relayd_id);
+               if (relayd) {
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       ret = relayd_send_index(&relayd->control_sock, element,
                                stream->relayd_stream_id, stream->next_net_seq_num - 1);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       if (ret < 0) {
+                               /*
+                                * Communication error with lttng-relayd,
+                                * perform cleanup now
+                                */
+                               ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->id);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               ret = -1;
+                       }
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               } else {
+                       ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
+                                       stream->key, stream->relayd_id);
+                       ret = -1;
+               }
        } else {
                if (lttng_index_file_write(stream->index_file, element)) {
                        ret = -1;
This page took 0.027247 seconds and 5 git commands to generate.