Propagate error for clear command
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 2fcc60af00d9d3b29b9809865ed7747ab4dbde45..c9b3c9f549e8bc4e1abe058de5c553d24f157dab 100644 (file)
@@ -70,6 +70,7 @@
 #include "stream.h"
 #include "connection.h"
 #include "tracefile-array.h"
+#include "tcp_keep_alive.h"
 
 static const char *help_msg =
 #ifdef LTTNG_EMBED_HELP
@@ -82,6 +83,7 @@ NULL
 /* command line options */
 char *opt_output_path;
 static int opt_daemon, opt_background;
+static int opt_allow_clear = 1;
 
 /*
  * We need to wait for listener and live listener threads, as well as
@@ -170,6 +172,7 @@ static struct option long_options[] = {
        { "verbose", 0, 0, 'v', },
        { "config", 1, 0, 'f' },
        { "version", 0, 0, 'V' },
+       { "disallow-clear", 0, 0, 'x' },
        { NULL, 0, 0, 0, },
 };
 
@@ -291,6 +294,10 @@ static int set_option(int opt, const char *arg, const char *optname)
                        }
                }
                break;
+       case 'x':
+               /* Disallow clear */
+               opt_allow_clear = 0;
+               break;
        default:
                /* Unknown option or other error.
                 * Error is printed by getopt, just return */
@@ -366,6 +373,7 @@ static int set_options(int argc, char **argv)
        int orig_optopt = optopt, orig_optind = optind;
        char *default_address, *optstring;
        const char *config_path = NULL;
+       const char *value = NULL;
 
        optstring = utils_generate_optstring(long_options,
                        sizeof(long_options) / sizeof(struct option));
@@ -478,6 +486,20 @@ static int set_options(int argc, char **argv)
                }
        }
 
+       if (opt_allow_clear) {
+               /* Check if env variable exists. */
+               value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV);
+               if (value) {
+                       ret = config_parse_value(value);
+                       if (ret < 0) {
+                               ERR("Invalid value for %s specified", DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV);
+                               retval = -1;
+                               goto exit;
+                       }
+                       opt_allow_clear = !ret;
+               }
+       }
+
 exit:
        free(optstring);
        return retval;
@@ -748,6 +770,7 @@ static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
 
        ret = sock->ops->bind(sock);
        if (ret < 0) {
+               PERROR("Failed to bind socket");
                goto error;
        }
 
@@ -899,6 +922,15 @@ restart:
                                        lttcomm_destroy_sock(newsock);
                                        goto error;
                                }
+
+                               ret = socket_apply_keep_alive_config(newsock->fd);
+                               if (ret < 0) {
+                                       ERR("Failed to apply TCP keep-alive configuration on socket (%i)",
+                                                       newsock->fd);
+                                       lttcomm_destroy_sock(newsock);
+                                       goto error;
+                               }
+
                                new_conn = connection_create(newsock, type);
                                if (!new_conn) {
                                        lttcomm_destroy_sock(newsock);
@@ -977,12 +1009,16 @@ static void *relay_thread_dispatcher(void *data)
 
        health_code_update();
 
-       while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+       for (;;) {
                health_code_update();
 
                /* Atomically prepare the queue futex */
                futex_nto1_prepare(&relay_conn_queue.futex);
 
+               if (CMM_LOAD_SHARED(dispatch_thread_exit)) {
+                       break;
+               }
+
                do {
                        health_code_update();
 
@@ -1415,6 +1451,53 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_clear_session: clear all data files belonging to a session.
+ */
+static
+int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_generic_reply reply;
+
+       memset(&reply, 0, sizeof(reply));
+
+       DBG("Clear session received");
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to clear session before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (!opt_allow_clear) {
+               ERR("Trying to clear session, but clear is disallowed.");
+               ret = 0;
+               reply.ret_code = htobe32(LTTNG_ERR_CLEAR_RELAY_DISALLOW);
+               goto reply;
+       }
+       ret = session_clear(session);
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+reply:
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"clear session\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_unknown_command: send -1 if received unknown command
  */
@@ -1587,6 +1670,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret;
        struct lttcomm_relayd_version reply, msg;
+       bool compatible = true;
 
        conn->version_check_done = 1;
 
@@ -1611,9 +1695,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
        if (reply.major != be32toh(msg.major)) {
                DBG("Incompatible major versions (%u vs %u), deleting session",
                                reply.major, be32toh(msg.major));
-               connection_put(conn);
-               ret = 0;
-               goto end;
+               compatible = false;
        }
 
        conn->major = reply.major;
@@ -1632,6 +1714,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("Relay sending version");
        }
 
+       if (!compatible) {
+               ret = -1;
+               goto end;
+       }
+
        DBG("Version check done using protocol %u.%u", conn->major,
                        conn->minor);
 
@@ -2009,6 +2096,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                ret = 0;
                goto end_stream_put;
        } else {
+               DBG("Received index for stream %" PRIu64,
+                               stream->stream_handle);
                stream->beacon_ts_end = -1ULL;
        }
 
@@ -2029,16 +2118,27 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               /* Clear index and data file(s) if reaching the clear position. */
+               ret = try_stream_clear_index_data(stream);
+               if (ret) {
+                       goto end_stream_put;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
        } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
                ERR("relay_index_try_flush error %d", ret);
-               relay_index_put(index);
                ret = -1;
        }
+       stream->prev_index_seq = net_seq_num;
 
 end_stream_put:
        pthread_mutex_unlock(&stream->lock);
@@ -2152,6 +2252,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS:
+               ret = relay_clear_session(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
@@ -2230,15 +2333,24 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               /* Clear index and data file(s) if reaching the clear position. */
+               ret = try_stream_clear_index_data(stream);
+               if (ret) {
+                       goto end;
+               }
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
        } else {
-               /* Put self-ref for this index due to error. */
-               relay_index_put(index);
-               index = NULL;
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
 end:
@@ -2288,7 +2400,7 @@ static int relay_process_data(struct relay_connection *conn)
 
        net_seq_num = be64toh(data_hdr.net_seq_num);
 
-       DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+       DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
                data_size, stream_id, net_seq_num);
 
        pthread_mutex_lock(&stream->lock);
@@ -2319,6 +2431,7 @@ static int relay_process_data(struct relay_connection *conn)
                 * rotation.
                 */
                stream->tracefile_size_current = 0;
+               stream->tracefile_count_current = new_id;
                rotate_index = 1;
        }
 
@@ -2756,6 +2869,10 @@ int main(int argc, char **argv)
                goto exit_options;
        }
 
+       if (!opt_allow_clear) {
+               DBG("Clear command disallowed.");
+       }
+
        /* Try to create directory if -o, --output is specified. */
        if (opt_output_path) {
                if (*opt_output_path != '/') {
This page took 0.029423 seconds and 5 git commands to generate.