X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fagent-thread.c;h=44f41059bc7ae2cd277b590a67423cc4a9df7b7c;hp=6790a63c642bd78bb37484a6c6af0b1af39081c7;hb=2463b7879c00298daa79744cdaae82ac061a4ed8;hpb=fefd409b002735b415c5f653cdb2587be454f145 diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c index 6790a63c6..44f41059b 100644 --- a/src/bin/lttng-sessiond/agent-thread.c +++ b/src/bin/lttng-sessiond/agent-thread.c @@ -1,21 +1,11 @@ /* - * Copyright (C) 2013 - David Goulet + * Copyright (C) 2013 David Goulet * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include @@ -27,9 +17,27 @@ #include "fd-limit.h" #include "agent-thread.h" +#include "agent.h" #include "lttng-sessiond.h" #include "session.h" #include "utils.h" +#include "thread.h" + +struct thread_notifiers { + struct lttng_pipe *quit_pipe; + sem_t ready; +}; + +struct agent_app_id { + pid_t pid; + enum lttng_domain_type domain; +}; + +struct agent_protocol_version { + unsigned int major, minor; +}; + +static int agent_tracing_enabled = -1; /* * Note that there is not port here. It's set after this URI is parsed so we @@ -43,76 +51,45 @@ static const char *default_reg_uri = * Update agent application using the given socket. This is done just after * registration was successful. * - * This is a quite heavy call in terms of locking since the session list lock - * AND session lock are acquired. + * This will acquire the various sessions' lock; none must be held by the + * caller. + * The caller must hold the session list lock. */ -static void update_agent_app(struct agent_app *app) +static void update_agent_app(const struct agent_app *app) { struct ltt_session *session, *stmp; struct ltt_session_list *list; + struct agent *trigger_agent; + struct lttng_ht_iter iter; list = session_get_list(); assert(list); - session_lock_list(); cds_list_for_each_entry_safe(session, stmp, &list->head, list) { + if (!session_get(session)) { + continue; + } + session_lock(session); if (session->ust_session) { - struct agent *agt; + const struct agent *agt; + rcu_read_lock(); agt = trace_ust_find_agent(session->ust_session, app->domain); if (agt) { - agent_update(agt, app->sock->fd); + agent_update(agt, app); } + rcu_read_unlock(); } session_unlock(session); + session_put(session); } - session_unlock_list(); -} - -/* - * Destroy a agent application by socket. - */ -static void destroy_agent_app(int sock) -{ - struct agent_app *app; - - assert(sock >= 0); - - /* - * Not finding an application is a very important error that should NEVER - * happen. The hash table deletion is ONLY done through this call even on - * thread cleanup. - */ - rcu_read_lock(); - app = agent_find_app_by_sock(sock); - assert(app); - rcu_read_unlock(); - - /* RCU read side lock is taken in this function call. */ - agent_delete_app(app); - - /* The application is freed in a RCU call but the socket is closed here. */ - agent_destroy_app(app); -} - -/* - * Cleanup remaining agent apps in the hash table. This should only be called in - * the exit path of the thread. - */ -static void clean_agent_apps_ht(void) -{ - struct lttng_ht_node_ulong *node; - struct lttng_ht_iter iter; - - DBG3("[agent-thread] Cleaning agent apps ht"); + /* Do we need more locking here? maybe against trigger add? */ rcu_read_lock(); - cds_lfht_for_each_entry(agent_apps_ht_by_sock->ht, &iter.iter, node, node) { - struct agent_app *app; - - app = caa_container_of(node, struct agent_app, node); - destroy_agent_app(app->sock->fd); + cds_lfht_for_each_entry (trigger_agents_ht_by_domain->ht, &iter.iter, + trigger_agent, node.node) { + agent_update(trigger_agent, app); } rcu_read_unlock(); } @@ -125,6 +102,8 @@ static struct lttcomm_sock *init_tcp_socket(void) int ret; struct lttng_uri *uri = NULL; struct lttcomm_sock *sock = NULL; + unsigned int port; + bool bind_succeeded = false; /* * This should never fail since the URI is hardcoded and the port is set @@ -132,8 +111,8 @@ static struct lttcomm_sock *init_tcp_socket(void) */ ret = uri_parse(default_reg_uri, &uri); assert(ret); - assert(agent_tcp_port); - uri->port = agent_tcp_port; + assert(config.agent_tcp_port.begin > 0); + uri->port = config.agent_tcp_port.begin; sock = lttcomm_alloc_sock_from_uri(uri); uri_free(uri); @@ -147,11 +126,43 @@ static struct lttcomm_sock *init_tcp_socket(void) goto error; } - ret = sock->ops->bind(sock); - if (ret < 0) { - WARN("Another session daemon is using this agent port. Agent support " - "will be deactivated to prevent interfering with the tracing."); - goto error; + for (port = config.agent_tcp_port.begin; + port <= config.agent_tcp_port.end; port++) { + ret = lttcomm_sock_set_port(sock, (uint16_t) port); + if (ret) { + ERR("[agent-thread] Failed to set port %u on socket", + port); + goto error; + } + DBG3("[agent-thread] Trying to bind on port %u", port); + ret = sock->ops->bind(sock); + if (!ret) { + bind_succeeded = true; + break; + } + + if (errno == EADDRINUSE) { + DBG("Failed to bind to port %u since it is already in use", + port); + } else { + PERROR("Failed to bind to port %u", port); + goto error; + } + } + + if (!bind_succeeded) { + if (config.agent_tcp_port.begin == config.agent_tcp_port.end) { + WARN("Another process is already using the agent port %i. " + "Agent support will be deactivated.", + config.agent_tcp_port.begin); + goto error; + } else { + WARN("All ports in the range [%i, %i] are already in use. " + "Agent support will be deactivated.", + config.agent_tcp_port.begin, + config.agent_tcp_port.end); + goto error; + } } ret = sock->ops->listen(sock, -1); @@ -160,7 +171,7 @@ static struct lttcomm_sock *init_tcp_socket(void) } DBG("[agent-thread] Listening on TCP port %u and socket %d", - agent_tcp_port, sock->fd); + port, sock->fd); return sock; @@ -176,31 +187,75 @@ error: */ static void destroy_tcp_socket(struct lttcomm_sock *sock) { + int ret; + uint16_t port; + assert(sock); - DBG3("[agent-thread] Destroy TCP socket on port %u", agent_tcp_port); + ret = lttcomm_sock_get_port(sock, &port); + if (ret) { + ERR("[agent-thread] Failed to get port of agent TCP socket"); + port = 0; + } + + DBG3("[agent-thread] Destroy TCP socket on port %" PRIu16, + port); /* This will return gracefully if fd is invalid. */ sock->ops->close(sock); lttcomm_destroy_sock(sock); } +static const char *domain_type_str(enum lttng_domain_type domain_type) +{ + switch (domain_type) { + case LTTNG_DOMAIN_NONE: + return "none"; + case LTTNG_DOMAIN_KERNEL: + return "kernel"; + case LTTNG_DOMAIN_UST: + return "ust"; + case LTTNG_DOMAIN_JUL: + return "jul"; + case LTTNG_DOMAIN_LOG4J: + return "log4j"; + case LTTNG_DOMAIN_PYTHON: + return "python"; + default: + return "unknown"; + } +} + +static bool is_agent_protocol_version_supported( + const struct agent_protocol_version *version) +{ + const bool is_supported = version->major == AGENT_MAJOR_VERSION && + version->minor == AGENT_MINOR_VERSION; + + if (!is_supported) { + WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i", + version->major, version->minor, + AGENT_MAJOR_VERSION, AGENT_MINOR_VERSION); + } + + return is_supported; +} + /* - * Handle a new agent registration using the reg socket. After that, a new - * agent application is added to the global hash table and attach to an UST app - * object. If r_app is not NULL, the created app is set to the pointer. + * Handle a new agent connection on the registration socket. * - * Return the new FD created upon accept() on success or else a negative errno - * value. + * Returns 0 on success, or else a negative errno value. + * On success, the resulting socket is returned through `agent_app_socket` + * and the application's reported id is updated through `agent_app_id`. */ -static int handle_registration(struct lttcomm_sock *reg_sock, - struct agent_app **r_app) +static int accept_agent_connection( + struct lttcomm_sock *reg_sock, + struct agent_app_id *agent_app_id, + struct lttcomm_sock **agent_app_socket) { int ret; - pid_t pid; + struct agent_protocol_version agent_version; ssize_t size; - enum lttng_domain_type domain; - struct agent_app *app; struct agent_register_msg msg; struct lttcomm_sock *new_sock; @@ -209,60 +264,100 @@ static int handle_registration(struct lttcomm_sock *reg_sock, new_sock = reg_sock->ops->accept(reg_sock); if (!new_sock) { ret = -ENOTCONN; - goto error; + goto end; } size = new_sock->ops->recvmsg(new_sock, &msg, sizeof(msg), 0); if (size < sizeof(msg)) { - ret = -errno; - goto error_socket; + if (size < 0) { + PERROR("Failed to register new agent application"); + } else if (size != 0) { + ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd", + sizeof(msg), size); + } else { + DBG("Failed to register new agent application: connection closed"); + } + ret = -EINVAL; + goto error_close_socket; } - domain = be32toh(msg.domain); - pid = be32toh(msg.pid); - DBG2("[agent-thread] New registration for pid %d domain %d on socket %d", - pid, domain, new_sock->fd); + agent_version = (struct agent_protocol_version) { + be32toh(msg.major_version), + be32toh(msg.minor_version), + }; - app = agent_create_app(pid, domain, new_sock); - if (!app) { - ret = -ENOMEM; - goto error_socket; + /* Test communication protocol version of the registering agent. */ + if (!is_agent_protocol_version_supported(&agent_version)) { + ret = -EINVAL; + goto error_close_socket; } - /* - * Add before assigning the socket value to the UST app so it can be found - * concurrently. - */ - agent_add_app(app); - - /* - * We don't need to attach the agent app to the app. If we ever do so, we - * should consider both registration order of agent before app and app - * before agent. - */ + *agent_app_id = (struct agent_app_id) { + .domain = (enum lttng_domain_type) be32toh(msg.domain), + .pid = (pid_t) be32toh(msg.pid), + }; - if (r_app) { - *r_app = app; - } + DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d", + (long) agent_app_id->pid, + domain_type_str(agent_app_id->domain), new_sock->fd); - return new_sock->fd; + *agent_app_socket = new_sock; + new_sock = NULL; + ret = 0; + goto end; -error_socket: +error_close_socket: new_sock->ops->close(new_sock); lttcomm_destroy_sock(new_sock); -error: +end: return ret; } +bool agent_tracing_is_enabled(void) +{ + int enabled; + + enabled = uatomic_read(&agent_tracing_enabled); + assert(enabled != -1); + return enabled == 1; +} + +/* + * Write agent TCP port using the rundir. + */ +static int write_agent_port(uint16_t port) +{ + return utils_create_pid_file((pid_t) port, + config.agent_port_file_path.value); +} + +static +void mark_thread_as_ready(struct thread_notifiers *notifiers) +{ + DBG("Marking agent management thread as ready"); + sem_post(¬ifiers->ready); +} + +static +void wait_until_thread_is_ready(struct thread_notifiers *notifiers) +{ + DBG("Waiting for agent management thread to be ready"); + sem_wait(¬ifiers->ready); + DBG("Agent management thread is ready"); +} + /* * This thread manage application notify communication. */ -void *agent_thread_manage_registration(void *data) +static void *thread_agent_management(void *data) { int i, ret, pollfd; uint32_t revents, nb_fd; struct lttng_poll_event events; struct lttcomm_sock *reg_sock; + struct thread_notifiers *notifiers = data; + const int quit_pipe_read_fd = lttng_pipe_get_readfd( + notifiers->quit_pipe); DBG("[agent-thread] Manage agent application registration."); @@ -272,18 +367,45 @@ void *agent_thread_manage_registration(void *data) /* Agent initialization call MUST be called before starting the thread. */ assert(agent_apps_ht_by_sock); - /* Create pollset with size 2, quit pipe and socket. */ - ret = sessiond_set_thread_pollset(&events, 2); + /* Create pollset with size 2, quit pipe and registration socket. */ + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { goto error_poll_create; } + ret = lttng_poll_add(&events, quit_pipe_read_fd, + LPOLLIN | LPOLLERR); + if (ret < 0) { + goto error_tcp_socket; + } + reg_sock = init_tcp_socket(); - if (!reg_sock) { + if (reg_sock) { + uint16_t port; + + assert(lttcomm_sock_get_port(reg_sock, &port) == 0); + + ret = write_agent_port(port); + if (ret) { + ERR("[agent-thread] Failed to create agent port file: agent tracing will be unavailable"); + /* Don't prevent the launch of the sessiond on error. */ + mark_thread_as_ready(notifiers); + goto error; + } + } else { + /* Don't prevent the launch of the sessiond on error. */ + mark_thread_as_ready(notifiers); goto error_tcp_socket; } - /* Add create valid TCP socket to poll set. */ + /* + * Signal that the agent thread is ready. The command thread + * may start to query whether or not agent tracing is enabled. + */ + uatomic_set(&agent_tracing_enabled, 1); + mark_thread_as_ready(notifiers); + + /* Add TCP socket to the poll set. */ ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { @@ -291,12 +413,13 @@ void *agent_thread_manage_registration(void *data) } while (1) { - DBG3("[agent-thread] Manage agent polling on %d fds", - LTTNG_POLL_GETNB(&events)); + DBG3("[agent-thread] Manage agent polling"); /* Inifinite blocking call, waiting for transmission */ restart: ret = lttng_poll_wait(&events, -1); + DBG3("[agent-thread] Manage agent return from poll on %d fds", + LTTNG_POLL_GETNB(&events)); if (ret < 0) { /* * Restart interrupted system call. @@ -315,55 +438,93 @@ restart: pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { + if (pollfd == quit_pipe_read_fd) { goto exit; } - /* - * Check first if this is a POLLERR since POLLIN is also included - * in an error value thus checking first. - */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - /* Removing from the poll set */ - ret = lttng_poll_del(&events, pollfd); - if (ret < 0) { - goto error; - } + /* Activity on the registration socket. */ + if (revents & LPOLLIN) { + struct agent_app_id new_app_id; + struct agent_app *new_app = NULL; + struct lttcomm_sock *new_app_socket; + int new_app_socket_fd; - destroy_agent_app(pollfd); - } else if (revents & (LPOLLIN)) { - int new_fd; - struct agent_app *app = NULL; - - /* Pollin event of agent app socket should NEVER happen. */ assert(pollfd == reg_sock->fd); - new_fd = handle_registration(reg_sock, &app); - if (new_fd < 0) { - WARN("[agent-thread] agent registration failed. Ignoring."); - /* Somehow the communication failed. Just continue. */ + ret = accept_agent_connection( + reg_sock, &new_app_id, &new_app_socket); + if (ret < 0) { + /* Errors are already logged. */ continue; } - /* Should not have a NULL app on success. */ - assert(app); - /* Only add poll error event to only detect shutdown. */ - ret = lttng_poll_add(&events, new_fd, + /* + * new_app_socket's ownership has been + * transferred to the new agent app. + */ + new_app = agent_create_app(new_app_id.pid, + new_app_id.domain, + new_app_socket); + if (!new_app) { + new_app_socket->ops->close( + new_app_socket); + continue; + } + new_app_socket_fd = new_app_socket->fd; + new_app_socket = NULL; + + /* + * Since this is a command socket (write then + * read), only add poll error event to only + * detect shutdown. + */ + ret = lttng_poll_add(&events, new_app_socket_fd, LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { - destroy_agent_app(new_fd); + agent_destroy_app(new_app); continue; } - /* Update newly registered app. */ - update_agent_app(app); + /* + * Prevent sessions from being modified while + * the agent application's configuration is + * updated. + */ + session_lock_list(); + + /* + * Update the newly registered applications's + * configuration. + */ + update_agent_app(new_app); - /* On failure, the poll will detect it and clean it up. */ - (void) agent_send_registration_done(app); + ret = agent_send_registration_done(new_app); + if (ret < 0) { + agent_destroy_app(new_app); + /* Removing from the poll set. */ + ret = lttng_poll_del(&events, + new_app_socket_fd); + if (ret < 0) { + session_unlock_list(); + goto error; + } + continue; + } + + /* Publish the new agent app. */ + agent_add_app(new_app); + + session_unlock_list(); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } + agent_destroy_app_by_sock(pollfd); } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); - continue; + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -376,14 +537,58 @@ error: error_tcp_socket: lttng_poll_clean(&events); error_poll_create: - DBG("[agent-thread] is cleaning up and stopping."); - - if (agent_apps_ht_by_sock) { - clean_agent_apps_ht(); - lttng_ht_destroy(agent_apps_ht_by_sock); - } - + uatomic_set(&agent_tracing_enabled, 0); + DBG("[agent-thread] Cleaning up and stopping."); rcu_thread_offline(); rcu_unregister_thread(); return NULL; } + +static bool shutdown_agent_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe); + + return notify_thread_pipe(write_fd) == 1; +} + +static void cleanup_agent_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + + lttng_pipe_destroy(notifiers->quit_pipe); + sem_destroy(¬ifiers->ready); + free(notifiers); +} + +bool launch_agent_management_thread(void) +{ + struct thread_notifiers *notifiers; + struct lttng_thread *thread; + + notifiers = zmalloc(sizeof(*notifiers)); + if (!notifiers) { + goto error_alloc; + } + + sem_init(¬ifiers->ready, 0, 0); + notifiers->quit_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!notifiers->quit_pipe) { + goto error; + } + thread = lttng_thread_create("Agent management", + thread_agent_management, + shutdown_agent_management_thread, + cleanup_agent_management_thread, + notifiers); + if (!thread) { + goto error; + } + wait_until_thread_is_ready(notifiers); + lttng_thread_put(thread); + return true; +error: + cleanup_agent_management_thread(notifiers); +error_alloc: + return false; +}