#include "stream.h"
#include "connection.h"
#include "tracefile-array.h"
+#include "tcp_keep_alive.h"
static const char *help_msg =
#ifdef LTTNG_EMBED_HELP
/* command line options */
char *opt_output_path;
static int opt_daemon, opt_background;
+static int opt_allow_clear = 1;
/*
* We need to wait for listener and live listener threads, as well as
{ "verbose", 0, 0, 'v', },
{ "config", 1, 0, 'f' },
{ "version", 0, 0, 'V' },
+ { "disallow-clear", 0, 0, 'x' },
{ NULL, 0, 0, 0, },
};
}
}
break;
+ case 'x':
+ /* Disallow clear */
+ opt_allow_clear = 0;
+ break;
default:
/* Unknown option or other error.
* Error is printed by getopt, just return */
int orig_optopt = optopt, orig_optind = optind;
char *default_address, *optstring;
const char *config_path = NULL;
+ const char *value = NULL;
optstring = utils_generate_optstring(long_options,
sizeof(long_options) / sizeof(struct option));
}
}
+ if (opt_allow_clear) {
+ /* Check if env variable exists. */
+ value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV);
+ if (value) {
+ ret = config_parse_value(value);
+ if (ret < 0) {
+ ERR("Invalid value for %s specified", DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV);
+ retval = -1;
+ goto exit;
+ }
+ opt_allow_clear = !ret;
+ }
+ }
+
exit:
free(optstring);
return retval;
ret = sock->ops->bind(sock);
if (ret < 0) {
+ PERROR("Failed to bind socket");
goto error;
}
lttcomm_destroy_sock(newsock);
goto error;
}
+
+ ret = socket_apply_keep_alive_config(newsock->fd);
+ if (ret < 0) {
+ ERR("Failed to apply TCP keep-alive configuration on socket (%i)",
+ newsock->fd);
+ lttcomm_destroy_sock(newsock);
+ goto error;
+ }
+
new_conn = connection_create(newsock, type);
if (!new_conn) {
lttcomm_destroy_sock(newsock);
health_code_update();
- while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ for (;;) {
health_code_update();
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_conn_queue.futex);
+ if (CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ break;
+ }
+
do {
health_code_update();
return ret;
}
+/*
+ * relay_clear_session: clear all data files belonging to a session.
+ */
+static
+int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_generic_reply reply;
+
+ memset(&reply, 0, sizeof(reply));
+
+ DBG("Clear session received");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to clear session before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (!opt_allow_clear) {
+ ERR("Trying to clear session, but clear is disallowed.");
+ ret = 0;
+ reply.ret_code = htobe32(LTTNG_ERR_CLEAR_RELAY_DISALLOW);
+ goto reply;
+ }
+ ret = session_clear(session);
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+reply:
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"clear session\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_unknown_command: send -1 if received unknown command
*/
{
int ret;
struct lttcomm_relayd_version reply, msg;
+ bool compatible = true;
conn->version_check_done = 1;
if (reply.major != be32toh(msg.major)) {
DBG("Incompatible major versions (%u vs %u), deleting session",
reply.major, be32toh(msg.major));
- connection_put(conn);
- ret = 0;
- goto end;
+ compatible = false;
}
conn->major = reply.major;
ERR("Relay sending version");
}
+ if (!compatible) {
+ ret = -1;
+ goto end;
+ }
+
DBG("Version check done using protocol %u.%u", conn->major,
conn->minor);
ret = 0;
goto end_stream_put;
} else {
+ DBG("Received index for stream %" PRIu64,
+ stream->stream_handle);
stream->beacon_ts_end = -1ULL;
}
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ /* Clear index and data file(s) if reaching the clear position. */
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end_stream_put;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
} else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
ERR("relay_index_try_flush error %d", ret);
- relay_index_put(index);
ret = -1;
}
+ stream->prev_index_seq = net_seq_num;
end_stream_put:
pthread_mutex_unlock(&stream->lock);
case RELAYD_RESET_METADATA:
ret = relay_reset_metadata(recv_hdr, conn);
break;
+ case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS:
+ ret = relay_clear_session(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ /* Clear index and data file(s) if reaching the clear position. */
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end;
+ }
} else if (ret > 0) {
/* No flush. */
ret = 0;
} else {
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
ret = -1;
}
end:
net_seq_num = be64toh(data_hdr.net_seq_num);
- DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
data_size, stream_id, net_seq_num);
pthread_mutex_lock(&stream->lock);
* rotation.
*/
stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = new_id;
rotate_index = 1;
}
goto exit_options;
}
+ if (!opt_allow_clear) {
+ DBG("Clear command disallowed.");
+ }
+
/* Try to create directory if -o, --output is specified. */
if (opt_output_path) {
if (*opt_output_path != '/') {