return ret;
}
+/*
+ * relay_clear_session: clear all data files belonging to a session.
+ */
+static
+int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_generic_reply reply;
+
+ DBG("Clear session received");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to clear session before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (!opt_allow_clear) {
+ ERR("Trying to clear session, but clear is disallowed.");
+ ret = -1;
+ goto end_no_session;
+ }
+ ret = session_clear(session);
+
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"clear session\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_unknown_command: send -1 if received unknown command
*/
ret = 0;
goto end_stream_put;
} else {
+ DBG("Received index for stream %" PRIu64,
+ stream->stream_handle);
stream->beacon_ts_end = -1ULL;
}
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ /* Clear index and data file(s) if reaching the clear position. */
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end_stream_put;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
ERR("relay_index_try_flush error %d", ret);
ret = -1;
}
+ stream->prev_index_seq = net_seq_num;
end_stream_put:
pthread_mutex_unlock(&stream->lock);
case RELAYD_RESET_METADATA:
ret = relay_reset_metadata(recv_hdr, conn);
break;
+ case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS:
+ ret = relay_clear_session(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ /* Clear index and data file(s) if reaching the clear position. */
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end;
+ }
} else if (ret > 0) {
/* No flush. */
ret = 0;
net_seq_num = be64toh(data_hdr.net_seq_num);
- DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
data_size, stream_id, net_seq_num);
pthread_mutex_lock(&stream->lock);
* rotation.
*/
stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = new_id;
rotate_index = 1;
}