Consumer perform the rotation when extracting a packet
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index f7704b46564a63273dabe03b9e181b4846ed4292..7c01bc772a008aa38a05a399c7acadf4849fd0a2 100644 (file)
@@ -385,7 +385,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        do {
                health_code_update();
 
-               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL);
                if (ret_read < 0) {
                        if (ret_read != -EAGAIN) {
                                ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
@@ -771,8 +771,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
-                               new_stream->name, fd, new_stream->relayd_stream_id);
+               DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
+                               new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
                break;
        }
        case LTTNG_CONSUMER_STREAMS_SENT:
@@ -1406,16 +1406,30 @@ end:
  * Consume data on a file descriptor and write it on a trace file.
  */
 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx, bool *rotated)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1;
+       int err, write_index = 1, rotation_ret;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct ctf_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
+       /*
+        * If the stream was flagged to be ready for rotation before we extract the
+        * next packet, rotate it now.
+        */
+       if (stream->rotate_ready) {
+               DBG("Rotate stream before extracting data");
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+               if (rotation_ret < 0) {
+                       ERR("Stream rotation error");
+                       ret = -1;
+                       goto error;
+               }
+       }
+
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
@@ -1428,7 +1442,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
                ret = err;
-               goto end;
+               goto error;
        }
 
        /* Get the full subbuffer size including padding */
@@ -1444,10 +1458,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                        }
                        ret = err;
-                       goto end;
+                       goto error;
                }
                ret = err;
-               goto end;
+               goto error;
        }
 
        if (!stream->metadata_flag) {
@@ -1462,9 +1476,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
-                       goto end;
+                       goto error;
                }
                ret = update_stream_stats(stream);
                if (ret < 0) {
@@ -1477,9 +1491,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
-                       goto end;
+                       goto error;
                }
        } else {
                write_index = 0;
@@ -1494,9 +1508,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
-                       goto end;
+                       goto error;
                }
        }
 
@@ -1541,10 +1555,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
                                ret = err;
-                               goto end;
+                               goto error;
                        }
                        ret = err;
-                       goto end;
+                       goto error;
                }
 
                /* Make sure the tracer is not gone mad on us! */
@@ -1587,12 +1601,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
                ret = err;
-               goto end;
+               goto error;
        }
 
        /* Write index if needed. */
        if (!write_index) {
-               goto end;
+               goto rotate;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -1616,16 +1630,35 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        pthread_mutex_unlock(&stream->metadata_timer_lock);
                }
                if (err < 0) {
-                       goto end;
+                       goto error;
                }
        }
 
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
-               goto end;
+               goto error;
        }
 
-end:
+rotate:
+       /*
+        * After extracting the packet, we check if the stream is now ready to be
+        * rotated and perform the action immediately.
+        */
+       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+       if (rotation_ret == 1) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+               if (rotation_ret < 0) {
+                       ERR("Stream rotation error");
+                       ret = -1;
+                       goto error;
+               }
+       } else if (rotation_ret < 0) {
+               ERR("Checking if stream is ready to rotate");
+               ret = -1;
+               goto error;
+       }
+
+error:
        return ret;
 }
 
This page took 0.027863 seconds and 5 git commands to generate.