start to handle async rotation on the relay + basics for remote rotate_pending
[deliverable/lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index db6ec8bfffe1b8900fb3320192f39a1acdda09ec..2a02568b0dd74b328743fa36ccdc7559e3885cd4 100644 (file)
@@ -567,7 +567,8 @@ static int send_sessiond_channel(int sock,
                        health_code_update();
 
                        /* Try to send the stream to the relayd if one is available. */
-                       ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
+                       ret = consumer_send_relayd_stream(stream, stream->chan->pathname,
+                                       LTTNG_DOMAIN_UST);
                        if (ret < 0) {
                                /*
                                 * Flag that the relayd was the problem here probably due to a
@@ -905,7 +906,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        /* Send metadata stream to relayd if needed. */
        if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
                ret = consumer_send_relayd_stream(metadata->metadata_stream,
-                               metadata->pathname);
+                               metadata->pathname, LTTNG_DOMAIN_UST);
                if (ret < 0) {
                        ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                        goto error;
@@ -1004,7 +1005,8 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
 
        if (relayd_id != (uint64_t) -1ULL) {
                metadata_stream->net_seq_idx = relayd_id;
-               ret = consumer_send_relayd_stream(metadata_stream, path);
+               ret = consumer_send_relayd_stream(metadata_stream, path,
+                               LTTNG_DOMAIN_UST);
                if (ret < 0) {
                        goto error_stream;
                }
@@ -1083,7 +1085,8 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                stream->net_seq_idx = relayd_id;
 
                if (use_relayd) {
-                       ret = consumer_send_relayd_stream(stream, path);
+                       ret = consumer_send_relayd_stream(stream, path,
+                                       LTTNG_DOMAIN_UST);
                        if (ret < 0) {
                                goto error_unlock;
                        }
@@ -1936,6 +1939,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.rotate_channel.pathname,
                                msg.u.rotate_channel.relayd_id,
                                msg.u.rotate_channel.metadata,
+                               msg.u.rotate_channel.new_chunk_id,
                                ctx);
                if (ret < 0) {
                        ERR("Rotate channel failed");
@@ -1965,6 +1969,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_ROTATE_RENAME:
+       {
+               DBG("Consumer rename session %" PRIu64 " after rotation",
+                               msg.u.rotate_rename.session_id);
+               ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.current_path,
+                               msg.u.rotate_rename.new_path,
+                               msg.u.rotate_rename.uid,
+                               msg.u.rotate_rename.gid,
+                               msg.u.rotate_rename.relayd_id);
+               if (ret < 0) {
+                       ERR("Rotate rename failed");
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+       }
        default:
                break;
        }
@@ -2557,7 +2584,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret;
+       int err, write_index = 1, rotation_ret, rotate_ready;
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
@@ -2656,6 +2683,17 @@ retry:
        assert(len >= subbuf_size);
 
        padding = len - subbuf_size;
+
+       rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
+       if (rotate_ready < 0) {
+               ERR("Failed to check if stream is ready for rotation");
+               ret = -1;
+               err = ustctl_put_subbuf(ustream);
+               assert(err == 0);
+               goto error;
+       }
+       stream->rotate_ready = rotate_ready;
+
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
        /*
@@ -2729,11 +2767,13 @@ retry:
        }
 
 rotate:
-       rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-       if (rotation_ret < 0) {
-               ret = -1;
-               ERR("Stream rotation error");
-               goto error;
+       if (stream->rotate_ready) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (rotation_ret < 0) {
+                       ret = -1;
+                       ERR("Stream rotation error");
+                       goto error;
+               }
        }
 error:
        return ret;
This page took 0.025157 seconds and 5 git commands to generate.