Consumer rotate stream
[lttng-tools.git] / src / common / relayd / relayd.c
index 458553bfe997069706aa9b43987a99d1cfcf0ece..242abbedff15c5c2f079460cb8805dbc567268cd 100644 (file)
@@ -942,6 +942,84 @@ error:
        return ret;
 }
 
+int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
+               const char *new_pathname, uint64_t new_chunk_id,
+               uint64_t seq_num)
+{
+       int ret;
+       struct lttcomm_relayd_rotate_stream *msg = NULL;
+       struct lttcomm_relayd_generic_reply reply;
+       size_t len;
+       int msg_len;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
+
+       /* Account for the trailing NULL. */
+       len = strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
+       if (len > LTTNG_PATH_MAX) {
+               ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
+               ret = -1;
+               goto error;
+       }
+
+       msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
+       msg = zmalloc(msg_len);
+       if (!msg) {
+               PERROR("Failed to allocate relayd rotate stream command of %d bytes",
+                               msg_len);
+               ret = -1;
+               goto error;
+       }
+
+       if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
+               ret = -1;
+               ERR("Failed to copy relayd rotate stream command's new path name");
+               goto error;
+       }
+
+       msg->pathname_length = htobe32(len);
+       msg->stream_id = htobe64(stream_id);
+       msg->new_chunk_id = htobe64(new_chunk_id);
+       /*
+        * The seq_num is invalid for metadata streams, but it is ignored on
+        * the relay.
+        */
+       msg->rotate_at_seq_num = htobe64(seq_num);
+
+       /* Send command. */
+       ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
+       if (ret < 0) {
+               ERR("Send rotate command");
+               goto error;
+       }
+
+       /* Receive response. */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               ERR("Receive rotate reply");
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd rotate stream replied error %d", reply.ret_code);
+       } else {
+               /* Success. */
+               ret = 0;
+               DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+       }
+
+error:
+       free(msg);
+       return ret;
+}
+
 int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
                const char *old_path, const char *new_path)
 {
This page took 0.026018 seconds and 5 git commands to generate.