relayd: Implement custom EfficiOS session clear
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 705b5d6c52247be9e1d318b7a8dd94ec39123129..0e95c948173659496c476823a330af4ab9f37ad2 100644 (file)
@@ -1451,6 +1451,51 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_clear_session: clear all data files belonging to a session.
+ */
+static
+int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_generic_reply reply;
+
+       DBG("Clear session received");
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to clear session before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (!opt_allow_clear) {
+               ERR("Trying to clear session, but clear is disallowed.");
+               ret = -1;
+               goto end_no_session;
+       }
+       ret = session_clear(session);
+
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"clear session\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_unknown_command: send -1 if received unknown command
  */
@@ -2049,6 +2094,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                ret = 0;
                goto end_stream_put;
        } else {
+               DBG("Received index for stream %" PRIu64,
+                               stream->stream_handle);
                stream->beacon_ts_end = -1ULL;
        }
 
@@ -2069,8 +2116,13 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               /* Clear index and data file(s) if reaching the clear position. */
+               ret = try_stream_clear_index_data(stream);
+               if (ret) {
+                       goto end_stream_put;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2084,6 +2136,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
+       stream->prev_index_seq = net_seq_num;
 
 end_stream_put:
        pthread_mutex_unlock(&stream->lock);
@@ -2197,6 +2250,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS:
+               ret = relay_clear_session(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
@@ -2275,8 +2331,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               /* Clear index and data file(s) if reaching the clear position. */
+               ret = try_stream_clear_index_data(stream);
+               if (ret) {
+                       goto end;
+               }
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
@@ -2337,7 +2398,7 @@ static int relay_process_data(struct relay_connection *conn)
 
        net_seq_num = be64toh(data_hdr.net_seq_num);
 
-       DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+       DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
                data_size, stream_id, net_seq_num);
 
        pthread_mutex_lock(&stream->lock);
@@ -2368,6 +2429,7 @@ static int relay_process_data(struct relay_connection *conn)
                 * rotation.
                 */
                stream->tracefile_size_current = 0;
+               stream->tracefile_count_current = new_id;
                rotate_index = 1;
        }
 
This page took 0.028497 seconds and 5 git commands to generate.