working for metadata on the relay
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 7 Sep 2017 18:13:44 +0000 (14:13 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Thu, 7 Sep 2017 18:13:44 +0000 (14:13 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/bin/lttng-relayd/main.c
src/common/relayd/relayd.c

index c6d61a186f095f339193a509c701f67e56d73bbf..348e161b77899e96500ee9a0f4425effc623bd09 100644 (file)
@@ -1537,6 +1537,37 @@ end:
        return ret;
 }
 
+static
+int do_rotate_stream(struct relay_stream *stream)
+{
+       int ret;
+
+       /* Perform the stream rotation. */
+       ret = utils_rotate_stream_file(stream->path_name,
+                       stream->channel_name, stream->tracefile_size,
+                       stream->tracefile_count, -1,
+                       -1, stream->stream_fd->fd,
+                       NULL, &stream->stream_fd->fd);
+       if (ret < 0) {
+               ERR("Rotating stream output file");
+               goto end;
+       }
+       stream->tracefile_size_current = 0;
+
+       /* Rotate also the index if the stream is not a metadata stream. */
+       if (!stream->is_metadata) {
+               ret = rotate_index_file(stream);
+               if (ret < 0) {
+                       ERR("Failed to rotate index file");
+                       goto end;
+               }
+       }
+
+       stream->rotate_at_seq_num = -1ULL;
+end:
+       return ret;
+}
+
 /*
  * Check if a stream should perform a rotation (for session rotation).
  * Must be called with the stream lock held.
@@ -1574,28 +1605,7 @@ int check_rotate_stream(struct relay_stream *stream)
                fprintf(stderr, "Stream %" PRIu64 " ready for rotation\n", stream->stream_handle);
        }
 
-       /* Perform the stream rotation. */
-       ret = utils_rotate_stream_file(stream->path_name,
-                       stream->channel_name, stream->tracefile_size,
-                       stream->tracefile_count, -1,
-                       -1, stream->stream_fd->fd,
-                       NULL, &stream->stream_fd->fd);
-       if (ret < 0) {
-               ERR("Rotating stream output file");
-               goto end;
-       }
-       stream->tracefile_size_current = 0;
-
-       /* Rotate also the index if the stream is not a metadata stream. */
-       if (!stream->is_metadata) {
-               ret = rotate_index_file(stream);
-               if (ret < 0) {
-                       ERR("Failed to rotate index file");
-                       goto end;
-               }
-       }
-
-       stream->rotate_at_seq_num = -1ULL;
+       ret = do_rotate_stream(stream);
 
 end:
        return ret;
@@ -2278,11 +2288,6 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
-       fprintf(stderr, "Stream %lu needs to rotate to %s (%lu vs %lu)/\n",
-                       be64toh(stream_info.stream_id), stream_info.new_pathname,
-                       stream->prev_seq, be64toh(stream_info.rotate_at_seq_num));
-       fprintf(stderr, "metadata: %d\n", stream->is_metadata);
-
        pthread_mutex_lock(&stream->lock);
 
        /* Update the trace path (just the folder, the stream name does not change). */
@@ -2298,11 +2303,19 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("relay creating output directory");
                goto end;
        }
-       stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
        stream->chunk_id = be64toh(stream_info.new_chunk_id);
-       /* FIXME: YOU ARE HERE
-        * metadata stream does not have a valid seq_num, but is sent on the control connection */
-       ret = check_rotate_stream(stream);
+
+       if (stream->is_metadata) {
+               /*
+                * The metadata stream is sent only over the control connection
+                * so we know we have all the data to perform the stream
+                * rotation.
+                */
+               ret = do_rotate_stream(stream);
+       } else {
+               stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+               ret = check_rotate_stream(stream);
+       }
        if (ret < 0) {
                ERR("Check rotate stream");
                goto end_stream_unlock;
index 6e3e91bfebbe861c51f62a7c58e94dc516dea695..ed8136193927145dcad5b9fb7232ebfe41ba5c1c 100644 (file)
@@ -993,8 +993,12 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
 
        memset(&msg, 0, sizeof(msg));
        msg.stream_id = htobe64(stream_id);
-       msg.rotate_at_seq_num = htobe64(seq_num);
        msg.new_chunk_id = htobe64(new_chunk_id);
+       /*
+        * the seq_num is invalid for metadata streams, but it is ignored on
+        * the relay.
+        */
+       msg.rotate_at_seq_num = htobe64(seq_num);
        if (lttng_strncpy(msg.new_pathname, new_pathname,
                                sizeof(msg.new_pathname))) {
                ret = -1;
This page took 0.029349 seconds and 5 git commands to generate.