return ret;
}
+/*
+ * relay_rotate_stream: rotate a stream to a new tracefile.
+ */
+static int relay_rotate_stream(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret, send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_rotate_stream stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ size_t len;
+
+ DBG("Rotate stream received");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to rotate a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (session->minor < 11) {
+ ERR("Unsupported feature before 2.11");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+ sizeof(stream_info), 0);
+ if (ret < sizeof(stream_info)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid rotate_stream struct size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ stream = stream_get_by_id(be64toh(stream_info.stream_id));
+ if (!stream) {
+ ret = -1;
+ goto end;
+ }
+
+ len = lttng_strnlen(stream_info.new_pathname,
+ sizeof(stream_info.new_pathname));
+ /* Ensure that NULL-terminated and fits in local filename length. */
+ if (len == sizeof(stream_info.new_pathname) || len >= LTTNG_NAME_MAX) {
+ ret = -ENAMETOOLONG;
+ ERR("Path name too long");
+ goto end;
+ }
+
+ fprintf(stderr, "Rotating stream %lu to %s\n", stream_info.stream_id,
+ stream_info.new_pathname);
+#if 0
+ /*
+ * Set last_net_seq_num before the close flag. Required by data
+ * pending check.
+ */
+ pthread_mutex_lock(&stream->lock);
+ stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+ pthread_mutex_unlock(&stream->lock);
+
+ /*
+ * This is one of the conditions which may trigger a stream close
+ * with the others being:
+ * 1) A close command is received for a stream
+ * 2) The control connection owning the stream is closed
+ * 3) We have received all of the stream's data _after_ a close
+ * request.
+ */
+ try_stream_close(stream);
+ if (stream->is_metadata) {
+ struct relay_viewer_stream *vstream;
+
+ vstream = viewer_stream_get_by_id(stream->stream_handle);
+ if (vstream) {
+ if (vstream->metadata_sent == stream->metadata_received) {
+ /*
+ * Since all the metadata has been sent to the
+ * viewer and that we have a request to close
+ * its stream, we can safely teardown the
+ * corresponding metadata viewer stream.
+ */
+ viewer_stream_put(vstream);
+ }
+ /* Put local reference. */
+ viewer_stream_put(vstream);
+ }
+ }
+#endif
+ stream_put(stream);
+
+end:
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending stream id");
+ ret = send_ret;
+ }
+
+end_no_session:
+ return ret;
+}
+
+
/*
* Process the commands received on the control socket
*/
case RELAYD_RESET_METADATA:
ret = relay_reset_metadata(recv_hdr, conn);
break;
+ case RELAYD_ROTATE_STREAM:
+ ret = relay_rotate_stream(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
}
/*
- * Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream lock held.
- *
- * Return 0 on success, a negative number of error.
+ * Perform the rotation a local stream file.
*/
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream)
{
int ret;
}
}
- lttng_consumer_reset_stream_rotate_state(stream);
-
ret = 0;
goto end;
ret = -1;
end:
return ret;
+
+}
+
+/*
+ * Perform the rotation a stream file on the relay.
+ */
+int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+ char *new_pathname = NULL;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ new_pathname = zmalloc(LTTNG_PATH_MAX * sizeof(char));
+ if (!new_pathname) {
+ ret = -ENOMEM;
+ goto end;
+ }
+
+ ret = snprintf(new_pathname, LTTNG_PATH_MAX, "%s/%s",
+ stream->channel_ro_pathname, stream->name);
+ if (ret < 0) {
+ PERROR("snprintf stream name");
+ goto end;
+ }
+
+ ret = relayd_rotate_stream(&relayd->control_sock,
+ stream->relayd_stream_id, new_pathname);
+
+end:
+ free(new_pathname);
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = rotate_relay_stream(ctx, stream);
+ } else {
+ ret = rotate_local_stream(ctx, stream);
+ }
+ lttng_consumer_reset_stream_rotate_state(stream);
+
+ return ret;
}
/*
error:
return ret;
}
+
+int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
+ const char *new_pathname)
+{
+ int ret;
+ struct lttcomm_relayd_rotate_stream msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+
+ DBG("Relayd rotating stream id %" PRIu64, stream_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.stream_id = htobe64(stream_id);
+ if (lttng_strncpy(msg.new_pathname, new_pathname,
+ sizeof(msg.new_pathname))) {
+ ret = -1;
+ goto error;
+ }
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd rotate stream replied error %d", reply.ret_code);
+ } else {
+ /* Success */
+ ret = 0;
+ }
+
+ DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+
+error:
+ return ret;
+
+}
uint64_t net_seq_num);
int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
uint64_t stream_id, uint64_t version);
+int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
+ const char *new_pathname);
#endif /* _RELAYD_H */
uint64_t version;
} LTTNG_PACKED;
+struct lttcomm_relayd_rotate_stream {
+ uint64_t stream_id;
+ char new_pathname[LTTNG_PATH_MAX];
+} LTTNG_PACKED;
+
#endif /* _RELAYD_COMM */
RELAYD_STREAMS_SENT = 16,
/* Ask the relay to reset the metadata trace file (2.8+) */
RELAYD_RESET_METADATA = 17,
- RELAYD_ROTATE = 18,
+ /* Ask the relay to rotate a stream file (2.11+) */
+ RELAYD_ROTATE_STREAM = 18,
};
/*