static struct relay_conn_queue relay_conn_queue;
/* Cap of file desriptors to be in simultaneous use by the relay daemon. */
-static unsigned int lttng_opt_fd_cap;
+static unsigned int lttng_opt_fd_pool_size = -1;
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
{ "daemonize", 0, 0, 'd', },
{ "background", 0, 0, 'b', },
{ "group", 1, 0, 'g', },
- { "fd-cap", 1, 0, '\0', },
+ { "fd-pool-size", 1, 0, '\0', },
{ "help", 0, 0, 'h', },
{ "output", 1, 0, 'o', },
{ "verbose", 0, 0, 'v', },
switch (opt) {
case 0:
- if (!strcmp(optname, "fd-cap")) {
+ if (!strcmp(optname, "fd-pool-size")) {
unsigned long v;
errno = 0;
v = strtoul(arg, NULL, 0);
if (errno != 0 || !isdigit(arg[0])) {
- ERR("Wrong value in --fd-cap parameter: %s", arg);
+ ERR("Wrong value in --fd-pool-size parameter: %s", arg);
ret = -1;
goto end;
}
- if (v < DEFAULT_RELAYD_MINIMAL_FD_CAP) {
- ERR("File descriptor cap must be set to at least %d",
- DEFAULT_RELAYD_MINIMAL_FD_CAP);
- }
if (v >= UINT_MAX) {
- ERR("File descriptor cap overflow in --fd-cap parameter: %s", arg);
+ ERR("File descriptor cap overflow in --fd-pool-size parameter: %s", arg);
ret = -1;
goto end;
}
- lttng_opt_fd_cap = (unsigned int) v;
- DBG3("File descriptor cap set to %u", lttng_opt_fd_cap);
-
+ lttng_opt_fd_pool_size = (unsigned int) v;
} else {
fprintf(stderr, "unknown option %s", optname);
if (arg) {
}
}
+static int set_fd_pool_size(void)
+{
+ int ret = 0;
+ struct rlimit rlimit;
+
+ ret = getrlimit(RLIMIT_NOFILE, &rlimit);
+ if (ret) {
+ PERROR("Failed to get file descriptor limit");
+ ret = -1;
+ goto end;
+ }
+
+ DBG("File descriptor count limits are %" PRIu64 " (soft) and %" PRIu64 " (hard)",
+ (uint64_t) rlimit.rlim_cur, (uint64_t) rlimit.rlim_max);
+ if (lttng_opt_fd_pool_size == -1) {
+ /* Use default value (soft limit - reserve). */
+ if (rlimit.rlim_cur < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
+ ERR("The process' file number limit is too low (%" PRIu64 "). The process' file number limit must be set to at least %i.",
+ (uint64_t) rlimit.rlim_cur,
+ DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
+ ret = -1;
+ goto end;
+ }
+ lttng_opt_fd_pool_size = rlimit.rlim_cur -
+ DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE;
+ goto end;
+ }
+
+ if (lttng_opt_fd_pool_size < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
+ ERR("File descriptor pool size must be set to at least %d",
+ DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
+ ret = -1;
+ goto end;
+ }
+
+ if (lttng_opt_fd_pool_size > rlimit.rlim_cur) {
+ ERR("File descriptor pool size argument (%u) exceeds the process' soft limit (%lu).",
+ lttng_opt_fd_pool_size, rlimit.rlim_cur);
+ ret = -1;
+ goto end;
+ }
+
+
+ DBG("File descriptor pool size argument (%u) adjusted to %u to accomodate transient fd uses",
+ lttng_opt_fd_pool_size,
+ lttng_opt_fd_pool_size - DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE);
+ lttng_opt_fd_pool_size -= DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE;
+end:
+ return ret;
+}
+
static int set_options(int argc, char **argv)
{
int c, ret = 0, option_index = 0, retval = 0;
goto exit;
}
}
- if (lttng_opt_fd_cap == 0) {
- int ret;
- struct rlimit rlimit;
-
- ret = getrlimit(RLIMIT_NOFILE, &rlimit);
- if (ret) {
- PERROR("Failed to get file descriptor limit");
- retval = -1;
- }
-
- lttng_opt_fd_cap = rlimit.rlim_cur;
+ ret = set_fd_pool_size();
+ if (ret) {
+ retval = -1;
+ goto exit;
}
if (!opt_group_output_by_session && !opt_group_output_by_host) {
return 0;
}
+static int create_sock(void *data, int *out_fd)
+{
+ int ret;
+ struct lttcomm_sock *sock = data;
+
+ ret = lttcomm_create_sock(sock);
+ if (ret < 0) {
+ goto end;
+ }
+
+ *out_fd = sock->fd;
+end:
+ return ret;
+}
+
+static int close_sock(void *data, int *in_fd)
+{
+ struct lttcomm_sock *sock = data;
+
+ return sock->ops->close(sock);
+}
+
+static int accept_sock(void *data, int *out_fd)
+{
+ int ret = 0;
+ /* Socks is an array of in_sock, out_sock. */
+ struct lttcomm_sock **socks = data;
+ struct lttcomm_sock *in_sock = socks[0];
+
+ socks[1] = in_sock->ops->accept(in_sock);
+ if (!socks[1]) {
+ ret = -1;
+ goto end;
+ }
+ *out_fd = socks[1]->fd;
+end:
+ return ret;
+}
+
/*
* Create and init socket from uri.
*/
-static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
+static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri,
+ const char *name)
{
- int ret;
+ int ret, sock_fd;
struct lttcomm_sock *sock = NULL;
+ char uri_str[PATH_MAX];
+ char *formated_name = NULL;
sock = lttcomm_alloc_sock_from_uri(uri);
if (sock == NULL) {
goto error;
}
- ret = lttcomm_create_sock(sock);
- if (ret < 0) {
- goto error;
+ /*
+ * Don't fail to create the socket if the name can't be built as it is
+ * only used for debugging purposes.
+ */
+ ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
+ uri_str[sizeof(uri_str) - 1] = '\0';
+ if (ret >= 0) {
+ ret = asprintf(&formated_name, "%s socket @ %s", name,
+ uri_str);
+ if (ret < 0) {
+ formated_name = NULL;
+ }
}
- DBG("Listening on sock %d", sock->fd);
+
+ ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
+ (const char **) (formated_name ? &formated_name : NULL),
+ 1, create_sock, sock);
+ free(formated_name);
+ DBG("Listening on %s socket %d", name, sock->fd);
ret = sock->ops->bind(sock);
if (ret < 0) {
return NULL;
}
+static
+struct lttcomm_sock *accept_relayd_sock(struct lttcomm_sock *listening_sock,
+ const char *name)
+{
+ int out_fd, ret;
+ struct lttcomm_sock *socks[2] = { listening_sock, NULL };
+ struct lttcomm_sock *new_sock = NULL;
+
+ ret = fd_tracker_open_unsuspendable_fd(
+ the_fd_tracker, &out_fd,
+ (const char **) &name,
+ 1, accept_sock, &socks);
+ if (ret) {
+ goto end;
+ }
+ new_sock = socks[1];
+ DBG("%s accepted, socket %d", name, new_sock->fd);
+end:
+ return new_sock;
+}
+
/*
* This thread manages the listening for new connections on the network
*/
health_code_update();
- control_sock = relay_socket_create(control_uri);
+ control_sock = relay_socket_create(control_uri, "Control listener");
if (!control_sock) {
goto error_sock_control;
}
- data_sock = relay_socket_create(data_uri);
+ data_sock = relay_socket_create(data_uri, "Data listener");
if (!data_sock) {
goto error_sock_relay;
}
*/
int val = 1;
struct relay_connection *new_conn;
- struct lttcomm_sock *newsock;
+ struct lttcomm_sock *newsock = NULL;
enum connection_type type;
if (pollfd == data_sock->fd) {
type = RELAY_DATA;
- newsock = data_sock->ops->accept(data_sock);
- DBG("Relay data connection accepted, socket %d",
- newsock->fd);
+ newsock = accept_relayd_sock(data_sock,
+ "Data socket to relayd");
} else {
assert(pollfd == control_sock->fd);
type = RELAY_CONTROL;
- newsock = control_sock->ops->accept(control_sock);
- DBG("Relay control connection accepted, socket %d",
- newsock->fd);
+ newsock = accept_relayd_sock(control_sock,
+ "Control socket to relayd");
}
if (!newsock) {
PERROR("accepting sock");
(void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
error_create_poll:
if (data_sock->fd >= 0) {
- ret = data_sock->ops->close(data_sock);
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
+ &data_sock->fd, 1, close_sock,
+ data_sock);
if (ret) {
PERROR("close");
}
lttcomm_destroy_sock(data_sock);
error_sock_relay:
if (control_sock->fd >= 0) {
- ret = control_sock->ops->close(control_sock);
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
+ &control_sock->fd, 1, close_sock,
+ control_sock);
if (ret) {
PERROR("close");
}
goto end_unlock;
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- 0, 0, -1, -1, stream->stream_fd->fd, NULL,
- &stream->stream_fd->fd);
+ ret = stream_fd_rotate(stream->stream_fd,
+ stream->path_name, stream->channel_name, 0, 0, NULL);
if (ret < 0) {
ERR("Failed to rotate metadata file %s of channel %s",
stream->path_name, stream->channel_name);
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
+ int metadata_fd = -1;
if (!session) {
ERR("Metadata sent before version check");
pthread_mutex_lock(&metadata_stream->lock);
- size_ret = lttng_write(metadata_stream->stream_fd->fd,
+ metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
+ if (metadata_fd < 0) {
+ goto end_put;
+ }
+ size_ret = lttng_write(metadata_fd,
payload->data + sizeof(metadata_payload_header),
metadata_payload_size);
if (size_ret < metadata_payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end_put;
+ goto end_put_fd;
}
- size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ size_ret = write_padding_to_file(metadata_fd,
metadata_payload_header.padding_size);
if (size_ret < (int64_t) metadata_payload_header.padding_size) {
ret = -1;
- goto end_put;
+ goto end_put_fd;
}
metadata_stream->metadata_received +=
DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
metadata_stream->metadata_received);
+end_put_fd:
+ stream_fd_put_fd(metadata_stream->stream_fd);
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
/* no flush. */
ret = 0;
} else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
ERR("relay_index_try_flush error %d", ret);
- relay_index_put(index);
ret = -1;
}
/* Put ref on previous index_file. */
if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
+ relay_index_file_put(stream->index_file);
stream->index_file = NULL;
}
major = stream->trace->session->major;
minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->index_file = relay_index_file_create(stream->path_name,
stream->channel_name,
- -1, -1, stream->tracefile_size,
+ stream->tracefile_size,
tracefile_array_get_file_index_head(stream->tfa),
lttng_to_index_major(major, minor),
lttng_to_index_minor(major, minor));
/* No flush. */
ret = 0;
} else {
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
ret = -1;
}
end:
/* new_id is updated by utils_rotate_stream_file. */
new_id = old_id;
- ret = utils_rotate_stream_file(stream->path_name,
+ ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- &new_id, &stream->stream_fd->fd);
+ stream->tracefile_count, &new_id);
if (ret < 0) {
ERR("Failed to rotate stream output file");
status = RELAY_CONNECTION_STATUS_ERROR;
bool new_stream = false, close_requested = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ int stream_fd = -1;
DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->header.net_seq_num,
}
}
+ stream_fd = stream_fd_get_fd(stream->stream_fd);
+ if (stream_fd < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
/*
* The size of the "chunk" received on any iteration is bounded by:
* - the data left to receive,
PERROR("Socket %d error", conn->sock->fd);
status = RELAY_CONNECTION_STATUS_ERROR;
}
- goto end_stream_unlock;
+ goto end_put_fd;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
recv_size = ret;
/* Write data to stream output fd. */
- write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ write_ret = lttng_write(stream_fd, data_buffer,
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
left_to_receive -= recv_size;
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- goto end_stream_unlock;
+ goto end_put_fd;
}
- ret = write_padding_to_file(stream->stream_fd->fd,
+ ret = write_padding_to_file(stream_fd,
state->header.padding_size);
if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
}
connection_reset_protocol_state(conn);
state = NULL;
+end_put_fd:
+ stream_fd_put_fd(stream->stream_fd);
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
(void) lttng_poll_del(events, pollfd);
- ret = close(pollfd);
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
+ fd_tracker_util_close_fd, NULL);
if (ret < 0) {
ERR("Closing pollfd %d", pollfd);
}
*/
static void *relay_thread_worker(void *data)
{
- int ret, err = -1, last_seen_data_fd = -1;
+ int ret, err = -1;
uint32_t nb_fd;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
- struct relay_connection *destroy_conn = NULL;
+ struct relay_connection *tmp_conn = NULL;
+ uint64_t relay_conn_pipe_activity_phase = 0;
+ uint64_t current_activity_phase = 1;
DBG("[thread] Relay worker started");
restart:
while (1) {
- int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ int i;
+ bool at_least_one_event_processed = false;
health_code_update();
nb_fd = ret;
/*
- * Process control. The control connection is
- * prioritized so we don't starve it with high
- * throughput tracing data on the data connection.
+ * Processes FDs that were not yet active during the
+ * current activity phase. Increment the activity phase when all
+ * events are already processed for the current activity
+ * phase. This result in fairness across all connections and
+ * connection pipe.
*/
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
if (revents & LPOLLIN) {
struct relay_connection *conn;
+ if (relay_conn_pipe_activity_phase == current_activity_phase) {
+ /*
+ * Only consider once per
+ * activity phase for fairness.
+ */
+ DBG("Skipping adding reception, already happened in activity phase %" PRIu64, current_activity_phase);
+ continue;
+ }
+
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
}
+ /*
+ * For now we prefer fairness over
+ * "immediate" action of new
+ * connection. Set activity phase to the
+ * current phase.
+ */
+ conn->activity_phase = current_activity_phase;
lttng_poll_add(&events, conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
+ relay_conn_pipe_activity_phase = current_activity_phase;
+ at_least_one_event_processed = true;
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("Relay connection pipe error");
goto error;
goto error;
}
} else {
- struct relay_connection *ctrl_conn;
+ struct relay_connection *connection;
- ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
+ connection = connection_get_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
- assert(ctrl_conn);
-
- if (ctrl_conn->type == RELAY_DATA) {
- if (revents & LPOLLIN) {
- /*
- * Flag the last seen data fd not deleted. It will be
- * used as the last seen fd if any fd gets deleted in
- * this first loop.
- */
- last_notdel_data_fd = pollfd;
- }
- goto put_ctrl_connection;
+ assert(connection);
+ if (connection->activity_phase == current_activity_phase) {
+ DBG3("Skipping connection %d, already processed in activity phase %" PRIu64, connection->sock->fd, current_activity_phase);
+ goto connection_put;
}
- assert(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
enum relay_connection_status status;
- status = relay_process_control(ctrl_conn);
+ at_least_one_event_processed = true;
+ connection->activity_phase = current_activity_phase;
+
+ if (connection->type == RELAY_DATA) {
+ status = relay_process_data(connection);
+ } else {
+ status = relay_process_control(connection);
+ }
if (status != RELAY_CONNECTION_STATUS_OK) {
/*
* On socket error flag the session as aborted to force
* connection is closed (uncleanly) before the packet's
* data provided.
*
- * Since the control connection encountered an error,
+ * Since the connection encountered an error,
* it is okay to be conservative and close the
* session right now as we can't rely on the protocol
* being respected anymore.
*/
if (status == RELAY_CONNECTION_STATUS_ERROR) {
- session_abort(ctrl_conn->session);
+ session_abort(connection->session);
}
/* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
- ctrl_conn);
+ connection);
}
- seen_control = 1;
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
relay_thread_close_connection(&events,
- pollfd, ctrl_conn);
- if (last_seen_data_fd == pollfd) {
- last_seen_data_fd = last_notdel_data_fd;
- }
+ pollfd, connection);
} else {
- ERR("Unexpected poll events %u for control sock %d",
+ ERR("Unexpected poll events %u for sock %d",
revents, pollfd);
- connection_put(ctrl_conn);
+ connection_put(connection);
goto error;
}
- put_ctrl_connection:
- connection_put(ctrl_conn);
+ connection_put:
+ connection_put(connection);
}
}
- /*
- * The last loop handled a control request, go back to poll to make
- * sure we prioritise the control socket.
- */
- if (seen_control) {
- continue;
+ if (!at_least_one_event_processed) {
+ current_activity_phase++;
+ DBG3("Incrementing activity phase: %" PRIu64, current_activity_phase);
}
-
- if (last_seen_data_fd >= 0) {
- for (i = 0; i < nb_fd; i++) {
- int pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (last_seen_data_fd == pollfd) {
- idx = i;
- break;
- }
- }
- }
-
- /* Process data connection. */
- for (i = idx + 1; i < nb_fd; i++) {
- /* Fetch the poll data. */
- uint32_t revents = LTTNG_POLL_GETEV(&events, i);
- int pollfd = LTTNG_POLL_GETFD(&events, i);
- struct relay_connection *data_conn;
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Skip the command pipe. It's handled in the first loop. */
- if (pollfd == relay_conn_pipe[0]) {
- continue;
- }
-
- data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
- if (!data_conn) {
- /* Skip it. Might be removed before. */
- continue;
- }
- if (data_conn->type == RELAY_CONTROL) {
- goto put_data_connection;
- }
- assert(data_conn->type == RELAY_DATA);
-
- if (revents & LPOLLIN) {
- enum relay_connection_status status;
-
- status = relay_process_data(data_conn);
- /* Connection closed or error. */
- if (status != RELAY_CONNECTION_STATUS_OK) {
- /*
- * On socket error flag the session as aborted to force
- * the cleanup of its stream otherwise it can leak
- * during the lifetime of the relayd.
- *
- * This prevents situations in which streams can be
- * left opened because an index was received, the
- * control connection is closed, and the data
- * connection is closed (uncleanly) before the packet's
- * data provided.
- *
- * Since the data connection encountered an error,
- * it is okay to be conservative and close the
- * session right now as we can't rely on the protocol
- * being respected anymore.
- */
- if (status == RELAY_CONNECTION_STATUS_ERROR) {
- session_abort(data_conn->session);
- }
- relay_thread_close_connection(&events, pollfd,
- data_conn);
- /*
- * Every goto restart call sets the last seen fd where
- * here we don't really care since we gracefully
- * continue the loop after the connection is deleted.
- */
- } else {
- /* Keep last seen port. */
- last_seen_data_fd = pollfd;
- connection_put(data_conn);
- goto restart;
- }
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- relay_thread_close_connection(&events, pollfd,
- data_conn);
- } else {
- ERR("Unknown poll events %u for data sock %d",
- revents, pollfd);
- }
- put_data_connection:
- connection_put(data_conn);
- }
- last_seen_data_fd = -1;
}
/* Normal exit, no error */
/* Cleanup reamaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
- destroy_conn,
+ tmp_conn,
sock_n.node) {
health_code_update();
- session_abort(destroy_conn->session);
+ session_abort(tmp_conn->session);
/*
* No need to grab another ref, because we own
- * destroy_conn.
+ * tmp conn.
*/
- relay_thread_close_connection(&events, destroy_conn->sock->fd,
- destroy_conn);
+ relay_thread_close_connection(&events, tmp_conn->sock->fd,
+ tmp_conn);
}
rcu_read_unlock();
*/
rcu_register_thread();
- the_fd_tracker = fd_tracker_create(lttng_opt_fd_cap);
+ the_fd_tracker = fd_tracker_create(lttng_opt_fd_pool_size);
if (!the_fd_tracker) {
retval = -1;
goto exit_options;
ret = track_stdio();
if (ret) {
retval = -1;
- goto exit_options;
+ goto exit_tracker;
}
/* Initialize thread health monitoring */
exit_init_data:
health_app_destroy(health_relayd);
exit_health_app_create:
-exit_options:
+
/*
* Wait for all pending call_rcu work to complete before tearing
* down data structures. call_rcu worker may be trying to
/* Ensure all prior call_rcu are done. */
rcu_barrier();
-
+exit_tracker:
untrack_stdio();
/*
* fd_tracker_destroy() will log the contents of the fd-tracker
*/
fd_tracker_destroy(the_fd_tracker);
rcu_unregister_thread();
-
+exit_options:
if (!retval) {
exit(EXIT_SUCCESS);
} else {