Fix: sessiond vs consumerd push/get metadata deadlock
[lttng-tools.git] / src / common / consumer-timer.c
index 646d32342cdfa0a7a7552332dd3f5da2c90cc680..8bf3ae80c2c4f4f35bf91780057f11f5f6a7b786 100644 (file)
@@ -133,78 +133,103 @@ error:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
 {
        uint64_t ts, stream_id;
        int ret;
 
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        */
-       pthread_mutex_lock(&stream->lock);
        ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_buffer_flush(stream->wait_fd);
        if (ret < 0) {
                ERR("Failed to flush kernel stream");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_snapshot(stream->wait_fd);
        if (ret < 0) {
                if (errno != EAGAIN && errno != ENODATA) {
                        PERROR("live timer kernel snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
                }
                ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
                if (ret < 0) {
                        PERROR("kernctl_get_stream_id");
-                       goto error_unlock;
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
                ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
-
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
 {
-       uint64_t ts, stream_id;
        int ret;
 
-       assert(stream);
-       assert(stream->ustream);
        /*
         * While holding the stream mutex, try to take a snapshot, if it
         * succeeds, it means that data is ready to be sent, just let the data
         * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
         * means that there is no data to read after the flush, so we can
         * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
         */
-       pthread_mutex_lock(&stream->lock);
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_kernel_index(stream);
+       pthread_mutex_unlock(&stream->lock);
+end:
+       return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+       uint64_t ts, stream_id;
+       int ret;
+
        ret = cds_lfht_is_node_deleted(&stream->node.node);
        if (ret) {
-               goto error_unlock;
+               goto end;
        }
 
        ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        lttng_ustconsumer_flush_buffer(stream, 1);
        ret = lttng_ustconsumer_take_snapshot(stream);
@@ -212,23 +237,68 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
                if (ret != -EAGAIN) {
                        ERR("Taking UST snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
                }
                ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
                if (ret < 0) {
                        PERROR("ustctl_get_stream_id");
-                       goto error_unlock;
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
                ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
+end:
+       return ret;
+}
 
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+       assert(stream->ustream);
+       /*
+        * While holding the stream mutex, try to take a snapshot, if it
+        * succeeds, it means that data is ready to be sent, just let the data
+        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+        * means that there is no data to read after the flush, so we can
+        * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
+        */
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_ust_index(stream);
        pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
This page took 0.028378 seconds and 5 git commands to generate.