X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=c9b3c9f549e8bc4e1abe058de5c553d24f157dab;hb=4852cdb954cd0d685bd4be7e69abfeb3e5388cc6;hp=2fcc60af00d9d3b29b9809865ed7747ab4dbde45;hpb=4fc83d948cea6b10484e65f004a6c167e71ac440;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 2fcc60af0..c9b3c9f54 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -70,6 +70,7 @@ #include "stream.h" #include "connection.h" #include "tracefile-array.h" +#include "tcp_keep_alive.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -82,6 +83,7 @@ NULL /* command line options */ char *opt_output_path; static int opt_daemon, opt_background; +static int opt_allow_clear = 1; /* * We need to wait for listener and live listener threads, as well as @@ -170,6 +172,7 @@ static struct option long_options[] = { { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, { "version", 0, 0, 'V' }, + { "disallow-clear", 0, 0, 'x' }, { NULL, 0, 0, 0, }, }; @@ -291,6 +294,10 @@ static int set_option(int opt, const char *arg, const char *optname) } } break; + case 'x': + /* Disallow clear */ + opt_allow_clear = 0; + break; default: /* Unknown option or other error. * Error is printed by getopt, just return */ @@ -366,6 +373,7 @@ static int set_options(int argc, char **argv) int orig_optopt = optopt, orig_optind = optind; char *default_address, *optstring; const char *config_path = NULL; + const char *value = NULL; optstring = utils_generate_optstring(long_options, sizeof(long_options) / sizeof(struct option)); @@ -478,6 +486,20 @@ static int set_options(int argc, char **argv) } } + if (opt_allow_clear) { + /* Check if env variable exists. */ + value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + if (value) { + ret = config_parse_value(value); + if (ret < 0) { + ERR("Invalid value for %s specified", DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + retval = -1; + goto exit; + } + opt_allow_clear = !ret; + } + } + exit: free(optstring); return retval; @@ -748,6 +770,7 @@ static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) ret = sock->ops->bind(sock); if (ret < 0) { + PERROR("Failed to bind socket"); goto error; } @@ -899,6 +922,15 @@ restart: lttcomm_destroy_sock(newsock); goto error; } + + ret = socket_apply_keep_alive_config(newsock->fd); + if (ret < 0) { + ERR("Failed to apply TCP keep-alive configuration on socket (%i)", + newsock->fd); + lttcomm_destroy_sock(newsock); + goto error; + } + new_conn = connection_create(newsock, type); if (!new_conn) { lttcomm_destroy_sock(newsock); @@ -977,12 +1009,16 @@ static void *relay_thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_conn_queue.futex); + if (CMM_LOAD_SHARED(dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1415,6 +1451,53 @@ end_no_session: return ret; } +/* + * relay_clear_session: clear all data files belonging to a session. + */ +static +int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_generic_reply reply; + + memset(&reply, 0, sizeof(reply)); + + DBG("Clear session received"); + + if (!session || !conn->version_check_done) { + ERR("Trying to clear session before version check"); + ret = -1; + goto end_no_session; + } + + if (!opt_allow_clear) { + ERR("Trying to clear session, but clear is disallowed."); + ret = 0; + reply.ret_code = htobe32(LTTNG_ERR_CLEAR_RELAY_DISALLOW); + goto reply; + } + ret = session_clear(session); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } +reply: + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"clear session\" command reply (ret = %zd)", + send_ret); + ret = -1; + } + +end_no_session: + return ret; +} + /* * relay_unknown_command: send -1 if received unknown command */ @@ -1587,6 +1670,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, { int ret; struct lttcomm_relayd_version reply, msg; + bool compatible = true; conn->version_check_done = 1; @@ -1611,9 +1695,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - connection_put(conn); - ret = 0; - goto end; + compatible = false; } conn->major = reply.major; @@ -1632,6 +1714,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, ERR("Relay sending version"); } + if (!compatible) { + ret = -1; + goto end; + } + DBG("Version check done using protocol %u.%u", conn->major, conn->minor); @@ -2009,6 +2096,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, ret = 0; goto end_stream_put; } else { + DBG("Received index for stream %" PRIu64, + stream->stream_handle); stream->beacon_ts_end = -1ULL; } @@ -2029,16 +2118,27 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } ret = relay_index_try_flush(index); if (ret == 0) { - tracefile_array_commit_seq(stream->tfa); + tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount); stream->index_received_seqcount++; + /* Clear index and data file(s) if reaching the clear position. */ + ret = try_stream_clear_index_data(stream); + if (ret) { + goto end_stream_put; + } } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } + stream->prev_index_seq = net_seq_num; end_stream_put: pthread_mutex_unlock(&stream->lock); @@ -2152,6 +2252,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_RESET_METADATA: ret = relay_reset_metadata(recv_hdr, conn); break; + case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS: + ret = relay_clear_session(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2230,15 +2333,24 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, ret = relay_index_try_flush(index); if (ret == 0) { - tracefile_array_commit_seq(stream->tfa); + tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount); stream->index_received_seqcount++; + /* Clear index and data file(s) if reaching the clear position. */ + ret = try_stream_clear_index_data(stream); + if (ret) { + goto end; + } } else if (ret > 0) { /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: @@ -2288,7 +2400,7 @@ static int relay_process_data(struct relay_connection *conn) net_seq_num = be64toh(data_hdr.net_seq_num); - DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, + DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, data_size, stream_id, net_seq_num); pthread_mutex_lock(&stream->lock); @@ -2319,6 +2431,7 @@ static int relay_process_data(struct relay_connection *conn) * rotation. */ stream->tracefile_size_current = 0; + stream->tracefile_count_current = new_id; rotate_index = 1; } @@ -2756,6 +2869,10 @@ int main(int argc, char **argv) goto exit_options; } + if (!opt_allow_clear) { + DBG("Clear command disallowed."); + } + /* Try to create directory if -o, --output is specified. */ if (opt_output_path) { if (*opt_output_path != '/') {