X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=19b1342ac5deeffa9ac2fc8c886779c2fa5fbd1d;hb=d9b9088d65b39e019f0d663df9fc5383408d60a3;hp=0d270d1ca29e5c0e0c5e0d642011d583198ca5bd;hpb=b08ff80046482ccbb02725f2a74dcdf7a5cf6a54;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 0d270d1ca..19b1342ac 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -59,6 +59,7 @@ #include #include #include +#include #include #include "version.h" @@ -151,7 +152,7 @@ static uint64_t last_relay_stream_id; static struct relay_conn_queue relay_conn_queue; /* Cap of file desriptors to be in simultaneous use by the relay daemon. */ -static unsigned int lttng_opt_fd_cap; +static unsigned int lttng_opt_fd_pool_size = -1; /* Global relay stream hash table. */ struct lttng_ht *relay_streams_ht; @@ -175,7 +176,7 @@ static struct option long_options[] = { { "daemonize", 0, 0, 'd', }, { "background", 0, 0, 'b', }, { "group", 1, 0, 'g', }, - { "fd-cap", 1, 0, '\0', }, + { "fd-pool-size", 1, 0, '\0', }, { "help", 0, 0, 'h', }, { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, @@ -218,28 +219,22 @@ static int set_option(int opt, const char *arg, const char *optname) switch (opt) { case 0: - if (!strcmp(optname, "fd-cap")) { + if (!strcmp(optname, "fd-pool-size")) { unsigned long v; errno = 0; v = strtoul(arg, NULL, 0); if (errno != 0 || !isdigit(arg[0])) { - ERR("Wrong value in --fd-cap parameter: %s", arg); + ERR("Wrong value in --fd-pool-size parameter: %s", arg); ret = -1; goto end; } - if (v < DEFAULT_RELAYD_MINIMAL_FD_CAP) { - ERR("File descriptor cap must be set to at least %d", - DEFAULT_RELAYD_MINIMAL_FD_CAP); - } if (v >= UINT_MAX) { - ERR("File descriptor cap overflow in --fd-cap parameter: %s", arg); + ERR("File descriptor cap overflow in --fd-pool-size parameter: %s", arg); ret = -1; goto end; } - lttng_opt_fd_cap = (unsigned int) v; - DBG3("File descriptor cap set to %u", lttng_opt_fd_cap); - + lttng_opt_fd_pool_size = (unsigned int) v; } else { fprintf(stderr, "unknown option %s", optname); if (arg) { @@ -453,6 +448,57 @@ static void parse_env_options(void) } } +static int set_fd_pool_size(void) +{ + int ret = 0; + struct rlimit rlimit; + + ret = getrlimit(RLIMIT_NOFILE, &rlimit); + if (ret) { + PERROR("Failed to get file descriptor limit"); + ret = -1; + goto end; + } + + DBG("File descriptor count limits are %" PRIu64 " (soft) and %" PRIu64 " (hard)", + (uint64_t) rlimit.rlim_cur, (uint64_t) rlimit.rlim_max); + if (lttng_opt_fd_pool_size == -1) { + /* Use default value (soft limit - reserve). */ + if (rlimit.rlim_cur < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) { + ERR("The process' file number limit is too low (%" PRIu64 "). The process' file number limit must be set to at least %i.", + (uint64_t) rlimit.rlim_cur, + DEFAULT_RELAYD_MIN_FD_POOL_SIZE); + ret = -1; + goto end; + } + lttng_opt_fd_pool_size = rlimit.rlim_cur - + DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE; + goto end; + } + + if (lttng_opt_fd_pool_size < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) { + ERR("File descriptor pool size must be set to at least %d", + DEFAULT_RELAYD_MIN_FD_POOL_SIZE); + ret = -1; + goto end; + } + + if (lttng_opt_fd_pool_size > rlimit.rlim_cur) { + ERR("File descriptor pool size argument (%u) exceeds the process' soft limit (%lu).", + lttng_opt_fd_pool_size, rlimit.rlim_cur); + ret = -1; + goto end; + } + + + DBG("File descriptor pool size argument (%u) adjusted to %u to accomodate transient fd uses", + lttng_opt_fd_pool_size, + lttng_opt_fd_pool_size - DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE); + lttng_opt_fd_pool_size -= DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE; +end: + return ret; +} + static int set_options(int argc, char **argv) { int c, ret = 0, option_index = 0, retval = 0; @@ -570,17 +616,10 @@ static int set_options(int argc, char **argv) goto exit; } } - if (lttng_opt_fd_cap == 0) { - int ret; - struct rlimit rlimit; - - ret = getrlimit(RLIMIT_NOFILE, &rlimit); - if (ret) { - PERROR("Failed to get file descriptor limit"); - retval = -1; - } - - lttng_opt_fd_cap = rlimit.rlim_cur; + ret = set_fd_pool_size(); + if (ret) { + retval = -1; + goto exit; } if (!opt_group_output_by_session && !opt_group_output_by_host) { @@ -620,7 +659,7 @@ static void relayd_cleanup(void) free(opt_output_path); /* Close thread quit pipes */ - utils_close_pipe(thread_quit_pipe); + (void) fd_tracker_util_pipe_close(the_fd_tracker, thread_quit_pipe); uri_free(control_uri); uri_free(data_uri); @@ -629,7 +668,6 @@ static void relayd_cleanup(void) if (tracing_group_name_override) { free((void *) tracing_group_name); } - fd_tracker_log(the_fd_tracker); } /* @@ -783,17 +821,26 @@ void lttng_relay_notify_ready(void) */ static int init_thread_quit_pipe(void) { - int ret; - - ret = utils_create_pipe_cloexec(thread_quit_pipe); + return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, + "Quit pipe", thread_quit_pipe); +} - return ret; +/* + * Init health quit pipe. + * + * Return -1 on error or 0 if all pipes are created. + */ +static int init_health_quit_pipe(void) +{ + return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, + "Health quit pipe", health_quit_pipe); } /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ -static int create_thread_poll_set(struct lttng_poll_event *events, int size) +static int create_named_thread_poll_set(struct lttng_poll_event *events, + int size, const char *name) { int ret; @@ -802,10 +849,8 @@ static int create_thread_poll_set(struct lttng_poll_event *events, int size) goto error; } - ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); - if (ret < 0) { - goto error; - } + ret = fd_tracker_util_poll_create(the_fd_tracker, + name, events, 1, LTTNG_CLOEXEC); /* Add quit pipe */ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); @@ -833,13 +878,55 @@ static int check_thread_quit_pipe(int fd, uint32_t events) return 0; } +static int create_sock(void *data, int *out_fd) +{ + int ret; + struct lttcomm_sock *sock = data; + + ret = lttcomm_create_sock(sock); + if (ret < 0) { + goto end; + } + + *out_fd = sock->fd; +end: + return ret; +} + +static int close_sock(void *data, int *in_fd) +{ + struct lttcomm_sock *sock = data; + + return sock->ops->close(sock); +} + +static int accept_sock(void *data, int *out_fd) +{ + int ret = 0; + /* Socks is an array of in_sock, out_sock. */ + struct lttcomm_sock **socks = data; + struct lttcomm_sock *in_sock = socks[0]; + + socks[1] = in_sock->ops->accept(in_sock); + if (!socks[1]) { + ret = -1; + goto end; + } + *out_fd = socks[1]->fd; +end: + return ret; +} + /* * Create and init socket from uri. */ -static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) +static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri, + const char *name) { - int ret; + int ret, sock_fd; struct lttcomm_sock *sock = NULL; + char uri_str[PATH_MAX]; + char *formated_name = NULL; sock = lttcomm_alloc_sock_from_uri(uri); if (sock == NULL) { @@ -847,11 +934,25 @@ static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) goto error; } - ret = lttcomm_create_sock(sock); - if (ret < 0) { - goto error; + /* + * Don't fail to create the socket if the name can't be built as it is + * only used for debugging purposes. + */ + ret = uri_to_str_url(uri, uri_str, sizeof(uri_str)); + uri_str[sizeof(uri_str) - 1] = '\0'; + if (ret >= 0) { + ret = asprintf(&formated_name, "%s socket @ %s", name, + uri_str); + if (ret < 0) { + formated_name = NULL; + } } - DBG("Listening on sock %d", sock->fd); + + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, + (const char **) (formated_name ? &formated_name : NULL), + 1, create_sock, sock); + free(formated_name); + DBG("Listening on %s socket %d", name, sock->fd); ret = sock->ops->bind(sock); if (ret < 0) { @@ -873,6 +974,27 @@ error: return NULL; } +static +struct lttcomm_sock *accept_relayd_sock(struct lttcomm_sock *listening_sock, + const char *name) +{ + int out_fd, ret; + struct lttcomm_sock *socks[2] = { listening_sock, NULL }; + struct lttcomm_sock *new_sock = NULL; + + ret = fd_tracker_open_unsuspendable_fd( + the_fd_tracker, &out_fd, + (const char **) &name, + 1, accept_sock, &socks); + if (ret) { + goto end; + } + new_sock = socks[1]; + DBG("%s accepted, socket %d", name, new_sock->fd); +end: + return new_sock; +} + /* * This thread manages the listening for new connections on the network */ @@ -889,12 +1011,12 @@ static void *relay_thread_listener(void *data) health_code_update(); - control_sock = relay_socket_create(control_uri); + control_sock = relay_socket_create(control_uri, "Control listener"); if (!control_sock) { goto error_sock_control; } - data_sock = relay_socket_create(data_uri); + data_sock = relay_socket_create(data_uri, "Data listener"); if (!data_sock) { goto error_sock_relay; } @@ -903,7 +1025,7 @@ static void *relay_thread_listener(void *data) * Pass 3 as size here for the thread quit pipe, control and * data socket. */ - ret = create_thread_poll_set(&events, 3); + ret = create_named_thread_poll_set(&events, 3, "Listener thread epoll"); if (ret < 0) { goto error_create_poll; } @@ -979,20 +1101,18 @@ restart: */ int val = 1; struct relay_connection *new_conn; - struct lttcomm_sock *newsock; + struct lttcomm_sock *newsock = NULL; enum connection_type type; if (pollfd == data_sock->fd) { type = RELAY_DATA; - newsock = data_sock->ops->accept(data_sock); - DBG("Relay data connection accepted, socket %d", - newsock->fd); + newsock = accept_relayd_sock(data_sock, + "Data socket to relayd"); } else { assert(pollfd == control_sock->fd); type = RELAY_CONTROL; - newsock = control_sock->ops->accept(control_sock); - DBG("Relay control connection accepted, socket %d", - newsock->fd); + newsock = accept_relayd_sock(control_sock, + "Control socket to relayd"); } if (!newsock) { PERROR("accepting sock"); @@ -1045,10 +1165,12 @@ exit: error: error_poll_add: error_testpoint: - lttng_poll_clean(&events); + (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_create_poll: if (data_sock->fd >= 0) { - ret = data_sock->ops->close(data_sock); + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, + &data_sock->fd, 1, close_sock, + data_sock); if (ret) { PERROR("close"); } @@ -1056,7 +1178,9 @@ error_create_poll: lttcomm_destroy_sock(data_sock); error_sock_relay: if (control_sock->fd >= 0) { - ret = control_sock->ops->close(control_sock); + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, + &control_sock->fd, 1, close_sock, + control_sock); if (ret) { PERROR("close"); } @@ -1507,9 +1631,8 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock; } - ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, - 0, 0, -1, -1, stream->stream_fd->fd, NULL, - &stream->stream_fd->fd); + ret = stream_fd_rotate(stream->stream_fd, + stream->path_name, stream->channel_name, 0, 0, NULL); if (ret < 0) { ERR("Failed to rotate metadata file %s of channel %s", stream->path_name, stream->channel_name); @@ -1629,6 +1752,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload metadata_payload_header; struct relay_stream *metadata_stream; uint64_t metadata_payload_size; + int metadata_fd = -1; if (!session) { ERR("Metadata sent before version check"); @@ -1659,20 +1783,24 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&metadata_stream->lock); - size_ret = lttng_write(metadata_stream->stream_fd->fd, + metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd); + if (metadata_fd < 0) { + goto end_put; + } + size_ret = lttng_write(metadata_fd, payload->data + sizeof(metadata_payload_header), metadata_payload_size); if (size_ret < metadata_payload_size) { ERR("Relay error writing metadata on file"); ret = -1; - goto end_put; + goto end_put_fd; } - size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_fd, metadata_payload_header.padding_size); if (size_ret < (int64_t) metadata_payload_header.padding_size) { ret = -1; - goto end_put; + goto end_put_fd; } metadata_stream->metadata_received += @@ -1680,6 +1808,8 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); +end_put_fd: + stream_fd_put_fd(metadata_stream->stream_fd); end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); @@ -2156,8 +2286,13 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -2525,14 +2660,14 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Put ref on previous index_file. */ if (stream->index_file) { - lttng_index_file_put(stream->index_file); + relay_index_file_put(stream->index_file); stream->index_file = NULL; } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, + stream->index_file = relay_index_file_create(stream->path_name, stream->channel_name, - -1, -1, stream->tracefile_size, + stream->tracefile_size, tracefile_array_get_file_index_head(stream->tfa), lttng_to_index_major(major, minor), lttng_to_index_minor(major, minor)); @@ -2561,9 +2696,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: @@ -2660,11 +2799,9 @@ static enum relay_connection_status relay_process_data_receive_header( /* new_id is updated by utils_rotate_stream_file. */ new_id = old_id; - ret = utils_rotate_stream_file(stream->path_name, + ret = stream_fd_rotate(stream->stream_fd, stream->path_name, stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, - -1, stream->stream_fd->fd, - &new_id, &stream->stream_fd->fd); + stream->tracefile_count, &new_id); if (ret < 0) { ERR("Failed to rotate stream output file"); status = RELAY_CONNECTION_STATUS_ERROR; @@ -2701,6 +2838,7 @@ static enum relay_connection_status relay_process_data_receive_payload( bool new_stream = false, close_requested = false; uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + int stream_fd = -1; DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->header.net_seq_num, @@ -2725,6 +2863,12 @@ static enum relay_connection_status relay_process_data_receive_payload( } } + stream_fd = stream_fd_get_fd(stream->stream_fd); + if (stream_fd < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + /* * The size of the "chunk" received on any iteration is bounded by: * - the data left to receive, @@ -2742,7 +2886,7 @@ static enum relay_connection_status relay_process_data_receive_payload( PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } - goto end_stream_unlock; + goto end_put_fd; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, @@ -2760,12 +2904,12 @@ static enum relay_connection_status relay_process_data_receive_payload( recv_size = ret; /* Write data to stream output fd. */ - write_ret = lttng_write(stream->stream_fd->fd, data_buffer, + write_ret = lttng_write(stream_fd, data_buffer, recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } left_to_receive -= recv_size; @@ -2784,17 +2928,17 @@ static enum relay_connection_status relay_process_data_receive_payload( DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - goto end_stream_unlock; + goto end_put_fd; } - ret = write_padding_to_file(stream->stream_fd->fd, + ret = write_padding_to_file(stream_fd, state->header.padding_size); if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } @@ -2806,7 +2950,7 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } } @@ -2827,6 +2971,8 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; +end_put_fd: + stream_fd_put_fd(stream->stream_fd); end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); @@ -2874,7 +3020,8 @@ static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollf (void) lttng_poll_del(events, pollfd); - ret = close(pollfd); + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1, + fd_tracker_util_close_fd, NULL); if (ret < 0) { ERR("Closing pollfd %d", pollfd); } @@ -2911,12 +3058,14 @@ static void relay_thread_close_connection(struct lttng_poll_event *events, */ static void *relay_thread_worker(void *data) { - int ret, err = -1, last_seen_data_fd = -1; + int ret, err = -1; uint32_t nb_fd; struct lttng_poll_event events; struct lttng_ht *relay_connections_ht; struct lttng_ht_iter iter; - struct relay_connection *destroy_conn = NULL; + struct relay_connection *tmp_conn = NULL; + uint64_t relay_conn_pipe_activity_phase = 0; + uint64_t current_activity_phase = 1; DBG("[thread] Relay worker started"); @@ -2936,7 +3085,7 @@ static void *relay_thread_worker(void *data) goto relay_connections_ht_error; } - ret = create_thread_poll_set(&events, 2); + ret = create_named_thread_poll_set(&events, 2, "Worker thread epoll"); if (ret < 0) { goto error_poll_create; } @@ -2948,7 +3097,8 @@ static void *relay_thread_worker(void *data) restart: while (1) { - int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1; + int i; + bool at_least_one_event_processed = false; health_code_update(); @@ -2970,9 +3120,11 @@ restart: nb_fd = ret; /* - * Process control. The control connection is - * prioritized so we don't starve it with high - * throughput tracing data on the data connection. + * Processes FDs that were not yet active during the + * current activity phase. Increment the activity phase when all + * events are already processed for the current activity + * phase. This result in fairness across all connections and + * connection pipe. */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ @@ -3001,14 +3153,32 @@ restart: if (revents & LPOLLIN) { struct relay_connection *conn; + if (relay_conn_pipe_activity_phase == current_activity_phase) { + /* + * Only consider once per + * activity phase for fairness. + */ + DBG("Skipping adding reception, already happened in activity phase %" PRIu64, current_activity_phase); + continue; + } + ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); if (ret < 0) { goto error; } + /* + * For now we prefer fairness over + * "immediate" action of new + * connection. Set activity phase to the + * current phase. + */ + conn->activity_phase = current_activity_phase; lttng_poll_add(&events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); + relay_conn_pipe_activity_phase = current_activity_phase; + at_least_one_event_processed = true; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Relay connection pipe error"); goto error; @@ -3017,29 +3187,27 @@ restart: goto error; } } else { - struct relay_connection *ctrl_conn; + struct relay_connection *connection; - ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd); + connection = connection_get_by_sock(relay_connections_ht, pollfd); /* If not found, there is a synchronization issue. */ - assert(ctrl_conn); - - if (ctrl_conn->type == RELAY_DATA) { - if (revents & LPOLLIN) { - /* - * Flag the last seen data fd not deleted. It will be - * used as the last seen fd if any fd gets deleted in - * this first loop. - */ - last_notdel_data_fd = pollfd; - } - goto put_ctrl_connection; + assert(connection); + if (connection->activity_phase == current_activity_phase) { + DBG3("Skipping connection %d, already processed in activity phase %" PRIu64, connection->sock->fd, current_activity_phase); + goto connection_put; } - assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { enum relay_connection_status status; - status = relay_process_control(ctrl_conn); + at_least_one_event_processed = true; + connection->activity_phase = current_activity_phase; + + if (connection->type == RELAY_DATA) { + status = relay_process_data(connection); + } else { + status = relay_process_control(connection); + } if (status != RELAY_CONNECTION_STATUS_OK) { /* * On socket error flag the session as aborted to force @@ -3052,137 +3220,38 @@ restart: * connection is closed (uncleanly) before the packet's * data provided. * - * Since the control connection encountered an error, + * Since the connection encountered an error, * it is okay to be conservative and close the * session right now as we can't rely on the protocol * being respected anymore. */ if (status == RELAY_CONNECTION_STATUS_ERROR) { - session_abort(ctrl_conn->session); + session_abort(connection->session); } /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, - ctrl_conn); + connection); } - seen_control = 1; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { relay_thread_close_connection(&events, - pollfd, ctrl_conn); - if (last_seen_data_fd == pollfd) { - last_seen_data_fd = last_notdel_data_fd; - } + pollfd, connection); } else { - ERR("Unexpected poll events %u for control sock %d", + ERR("Unexpected poll events %u for sock %d", revents, pollfd); - connection_put(ctrl_conn); + connection_put(connection); goto error; } - put_ctrl_connection: - connection_put(ctrl_conn); + connection_put: + connection_put(connection); } } - /* - * The last loop handled a control request, go back to poll to make - * sure we prioritise the control socket. - */ - if (seen_control) { - continue; + if (!at_least_one_event_processed) { + current_activity_phase++; + DBG3("Incrementing activity phase: %" PRIu64, current_activity_phase); } - - if (last_seen_data_fd >= 0) { - for (i = 0; i < nb_fd; i++) { - int pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (last_seen_data_fd == pollfd) { - idx = i; - break; - } - } - } - - /* Process data connection. */ - for (i = idx + 1; i < nb_fd; i++) { - /* Fetch the poll data. */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); - struct relay_connection *data_conn; - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Skip the command pipe. It's handled in the first loop. */ - if (pollfd == relay_conn_pipe[0]) { - continue; - } - - data_conn = connection_get_by_sock(relay_connections_ht, pollfd); - if (!data_conn) { - /* Skip it. Might be removed before. */ - continue; - } - if (data_conn->type == RELAY_CONTROL) { - goto put_data_connection; - } - assert(data_conn->type == RELAY_DATA); - - if (revents & LPOLLIN) { - enum relay_connection_status status; - - status = relay_process_data(data_conn); - /* Connection closed or error. */ - if (status != RELAY_CONNECTION_STATUS_OK) { - /* - * On socket error flag the session as aborted to force - * the cleanup of its stream otherwise it can leak - * during the lifetime of the relayd. - * - * This prevents situations in which streams can be - * left opened because an index was received, the - * control connection is closed, and the data - * connection is closed (uncleanly) before the packet's - * data provided. - * - * Since the data connection encountered an error, - * it is okay to be conservative and close the - * session right now as we can't rely on the protocol - * being respected anymore. - */ - if (status == RELAY_CONNECTION_STATUS_ERROR) { - session_abort(data_conn->session); - } - relay_thread_close_connection(&events, pollfd, - data_conn); - /* - * Every goto restart call sets the last seen fd where - * here we don't really care since we gracefully - * continue the loop after the connection is deleted. - */ - } else { - /* Keep last seen port. */ - last_seen_data_fd = pollfd; - connection_put(data_conn); - goto restart; - } - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - relay_thread_close_connection(&events, pollfd, - data_conn); - } else { - ERR("Unknown poll events %u for data sock %d", - revents, pollfd); - } - put_data_connection: - connection_put(data_conn); - } - last_seen_data_fd = -1; } /* Normal exit, no error */ @@ -3193,27 +3262,28 @@ error: /* Cleanup reamaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, - destroy_conn, + tmp_conn, sock_n.node) { health_code_update(); - session_abort(destroy_conn->session); + session_abort(tmp_conn->session); /* * No need to grab another ref, because we own - * destroy_conn. + * tmp conn. */ - relay_thread_close_connection(&events, destroy_conn->sock->fd, - destroy_conn); + relay_thread_close_connection(&events, tmp_conn->sock->fd, + tmp_conn); } rcu_read_unlock(); - lttng_poll_clean(&events); + (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_poll_create: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: /* Close relay conn pipes */ - utils_close_pipe(relay_conn_pipe); + (void) fd_tracker_util_pipe_close(the_fd_tracker, + relay_conn_pipe); if (err) { DBG("Thread exited with error"); } @@ -3235,11 +3305,45 @@ error_testpoint: */ static int create_relay_conn_pipe(void) { - int ret; + return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, + "Relayd connection pipe", relay_conn_pipe); +} - ret = utils_create_pipe_cloexec(relay_conn_pipe); +static +int stdio_open(void *data, int *fds) +{ + fds[0] = fileno(stdout); + fds[1] = fileno(stderr); + return 0; +} - return ret; +static +int noop_close(void *data, int *fds) +{ + return 0; +} + +static +int track_stdio(void) +{ + int fds[2]; + const char *names[] = { "stdout", "stderr" }; + + return fd_tracker_open_unsuspendable_fd(the_fd_tracker, fds, + names, 2, stdio_open, NULL); +} + +static +void untrack_stdio(void) +{ + int fds[] = { fileno(stdout), fileno(stderr) }; + + /* + * noop_close is used since we don't really want to close + * the stdio output fds; we merely want to stop tracking them. + */ + (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker, + fds, 2, noop_close, NULL); } /* @@ -3323,12 +3427,18 @@ int main(int argc, char **argv) */ rcu_register_thread(); - the_fd_tracker = fd_tracker_create(lttng_opt_fd_cap); + the_fd_tracker = fd_tracker_create(lttng_opt_fd_pool_size); if (!the_fd_tracker) { retval = -1; goto exit_options; } + ret = track_stdio(); + if (ret) { + retval = -1; + goto exit_tracker; + } + /* Initialize thread health monitoring */ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); if (!health_relayd) { @@ -3377,7 +3487,7 @@ int main(int argc, char **argv) goto exit_init_data; } - ret = utils_create_pipe(health_quit_pipe); + ret = init_health_quit_pipe(); if (ret) { retval = -1; goto exit_health_quit_pipe; @@ -3473,13 +3583,13 @@ exit_dispatcher_thread: } exit_health_thread: - utils_close_pipe(health_quit_pipe); + (void) fd_tracker_util_pipe_close(the_fd_tracker, health_quit_pipe); exit_health_quit_pipe: exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: -exit_options: + /* * Wait for all pending call_rcu work to complete before tearing * down data structures. call_rcu worker may be trying to @@ -3490,10 +3600,15 @@ exit_options: /* Ensure all prior call_rcu are done. */ rcu_barrier(); - +exit_tracker: + untrack_stdio(); + /* + * fd_tracker_destroy() will log the contents of the fd-tracker + * if a leak is detected. + */ fd_tracker_destroy(the_fd_tracker); rcu_unregister_thread(); - +exit_options: if (!retval) { exit(EXIT_SUCCESS); } else {