X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=d874a9c50bdcabfbbc5bf0bc49c584e1eb635665;hp=2ffa75078779b7d0b9986696f5643267ca5ea7aa;hb=8614e600d7a8dc653c473254fc302870d73f32ae;hpb=8d382dd4d4e95ea6ff88d6bd9f8a8fc85970ee3b diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 2ffa75078..d874a9c50 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -62,21 +62,23 @@ #include #include +#include "backward-compatibility-group-by.h" #include "cmd.h" +#include "connection.h" #include "ctf-trace.h" +#include "health-relayd.h" #include "index.h" -#include "utils.h" -#include "lttng-relayd.h" #include "live.h" -#include "health-relayd.h" -#include "testpoint.h" -#include "viewer-stream.h" +#include "lttng-relayd.h" #include "session.h" +#include "sessiond-trace-chunks.h" #include "stream.h" -#include "connection.h" -#include "tracefile-array.h" #include "tcp_keep_alive.h" -#include "sessiond-trace-chunks.h" +#include "testpoint.h" +#include "tracefile-array.h" +#include "utils.h" +#include "version.h" +#include "viewer-stream.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -95,8 +97,9 @@ enum relay_connection_status { }; /* command line options */ -char *opt_output_path; -static int opt_daemon, opt_background; +char *opt_output_path, *opt_working_directory; +static int opt_daemon, opt_background, opt_print_version, opt_allow_clear = 1; +enum relay_group_output_by opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_UNKNOWN; /* * We need to wait for listener and live listener threads, as well as @@ -183,11 +186,32 @@ static struct option long_options[] = { { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, { "version", 0, 0, 'V' }, + { "working-directory", 1, 0, 'w', }, + { "group-output-by-session", 0, 0, 's', }, + { "group-output-by-host", 0, 0, 'p', }, + { "disallow-clear", 0, 0, 'x' }, { NULL, 0, 0, 0, }, }; static const char *config_ignore_options[] = { "help", "config", "version" }; +static void print_version(void) { + fprintf(stdout, "%s\n", VERSION); +} + +static void relayd_config_log(void) +{ + DBG("LTTng-relayd " VERSION " - " VERSION_NAME "%s%s", + GIT_VERSION[0] == '\0' ? "" : " - " GIT_VERSION, + EXTRA_VERSION_NAME[0] == '\0' ? "" : " - " EXTRA_VERSION_NAME); + if (EXTRA_VERSION_DESCRIPTION[0] != '\0') { + DBG("LTTng-relayd extra version description:\n\t" EXTRA_VERSION_DESCRIPTION "\n"); + } + if (EXTRA_VERSION_PATCHES[0] != '\0') { + DBG("LTTng-relayd extra patches:\n\t" EXTRA_VERSION_PATCHES "\n"); + } +} + /* * Take an option from the getopt output and set it in the right variable to be * used later. @@ -278,8 +302,8 @@ static int set_option(int opt, const char *arg, const char *optname) } exit(EXIT_FAILURE); case 'V': - fprintf(stdout, "%s\n", VERSION); - exit(EXIT_SUCCESS); + opt_print_version = 1; + break; case 'o': if (lttng_is_setuid_setgid()) { WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", @@ -293,6 +317,20 @@ static int set_option(int opt, const char *arg, const char *optname) } } break; + case 'w': + if (lttng_is_setuid_setgid()) { + WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", + "-w, --working-directory"); + } else { + ret = asprintf(&opt_working_directory, "%s", arg); + if (ret < 0) { + ret = -errno; + PERROR("asprintf opt_working_directory"); + goto end; + } + } + break; + case 'v': /* Verbose level can increase using multiple -v */ if (arg) { @@ -304,6 +342,24 @@ static int set_option(int opt, const char *arg, const char *optname) } } break; + case 's': + if (opt_group_output_by != RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { + ERR("Cannot set --group-output-by-session, another --group-output-by argument is present"); + exit(EXIT_FAILURE); + } + opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_SESSION; + break; + case 'p': + if (opt_group_output_by != RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { + ERR("Cannot set --group-output-by-host, another --group-output-by argument is present"); + exit(EXIT_FAILURE); + } + opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_HOST; + break; + case 'x': + /* Disallow clear */ + opt_allow_clear = 0; + break; default: /* Unknown option or other error. * Error is printed by getopt, just return */ @@ -373,6 +429,23 @@ end: return ret; } +static int parse_env_options(void) +{ + int ret = 0; + char *value = NULL; + + value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_WORKING_DIRECTORY_ENV); + if (value) { + opt_working_directory = strdup(value); + if (!opt_working_directory) { + ERR("Failed to allocate working directory string (\"%s\")", + value); + ret = -1; + } + } + return ret; +} + static int set_options(int argc, char **argv) { int c, ret = 0, option_index = 0, retval = 0; @@ -491,6 +564,23 @@ static int set_options(int argc, char **argv) } } + if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { + opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_HOST; + } + if (opt_allow_clear) { + /* Check if env variable exists. */ + const char *value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + if (value) { + ret = config_parse_value(value); + if (ret < 0) { + ERR("Invalid value for %s specified", DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + retval = -1; + goto exit; + } + opt_allow_clear = !ret; + } + } + exit: free(optstring); return retval; @@ -523,8 +613,8 @@ static void relayd_cleanup(void) if (sessions_ht) lttng_ht_destroy(sessions_ht); - /* free the dynamically allocated opt_output_path */ free(opt_output_path); + free(opt_working_directory); /* Close thread quit pipes */ utils_close_pipe(thread_quit_pipe); @@ -1299,12 +1389,57 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, goto send_reply; } + /* + * Backward compatibility for --group-output-by-session. + * Prior to lttng 2.11, the complete path is passed by the stream. + * Starting at 2.11, lttng-relayd uses chunk. When dealing with producer + * >=2.11 the chunk is responsible for the output path. When dealing + * with producer < 2.11 the chunk output_path is the root output path + * and the stream carries the complete path (path_name). + * To support --group-output-by-session with older producer (<2.11), we + * need to craft the path based on the stream path. + */ + if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_SESSION) { + if (conn->minor < 4) { + /* + * From 2.1 to 2.3, the session_name is not passed on + * the RELAYD_CREATE_SESSION command. The session name + * is necessary to detect the presence of a base_path + * inside the stream path. Without it we cannot perform + * a valid group-output-by-session transformation. + */ + WARN("Unable to perform a --group-by-session transformation for session %" PRIu64 + " for stream with path \"%s\" as it is produced by a peer using a protocol older than v2.4", + session->id, path_name); + } else if (conn->minor >= 4 && conn->minor < 11) { + char *group_by_session_path_name; + + assert(session->session_name[0] != '\0'); + + group_by_session_path_name = + backward_compat_group_by_session( + path_name, + session->session_name); + if (!group_by_session_path_name) { + ERR("Failed to apply group by session to stream of session %" PRIu64, + session->id); + goto send_reply; + } + + DBG("Transformed session path from \"%s\" to \"%s\" to honor per-session name grouping", + path_name, group_by_session_path_name); + + free(path_name); + path_name = group_by_session_path_name; + } + } + trace = ctf_trace_get_by_path_or_create(session, path_name); if (!trace) { goto send_reply; } - /* This stream here has one reference on the trace. */ + /* This stream here has one reference on the trace. */ pthread_mutex_lock(&last_relay_stream_id_lock); stream_handle = ++last_relay_stream_id; pthread_mutex_unlock(&last_relay_stream_id_lock); @@ -1404,7 +1539,7 @@ static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, vstream = viewer_stream_get_by_id(stream->stream_handle); if (vstream) { - if (vstream->metadata_sent == stream->metadata_received) { + if (stream->no_new_metadata_notified) { /* * Since all the metadata has been sent to the * viewer and that we have a request to close @@ -2071,6 +2206,9 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.stream_instance_id = be64toh(index_info.stream_instance_id); index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } else { + index_info.stream_instance_id = -1ULL; + index_info.packet_seq_num = -1ULL; } stream = stream_get_by_id(index_info.relay_stream_id); @@ -2215,7 +2353,7 @@ static int relay_rotate_session_streams( session->sessiond_uuid, session->id, rotate_streams.new_chunk_id.value); if (!next_trace_chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(session->sessiond_uuid, uuid_str); ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64 @@ -2282,6 +2420,7 @@ static int relay_rotate_session_streams( } reply_code = LTTNG_OK; + ret = 0; end: if (stream) { stream_put(stream); @@ -2295,8 +2434,6 @@ end: send_ret); ret = -1; } - - ret = 0; end_no_reply: lttng_trace_chunk_put(next_trace_chunk); return ret; @@ -2321,7 +2458,7 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; enum lttng_error_code reply_code = LTTNG_OK; enum lttng_trace_chunk_status chunk_status; - struct lttng_directory_handle session_output; + struct lttng_directory_handle *session_output = NULL; if (!session || !conn->version_check_done) { ERR("Trying to create a trace chunk before version check"); @@ -2396,14 +2533,15 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } - ret = session_init_output_directory_handle( - conn->session, &session_output); - if (ret) { + session_output = session_create_output_directory_handle( + conn->session); + if (!session_output) { reply_code = LTTNG_ERR_CREATE_DIR_FAIL; goto end; } - chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); - lttng_directory_handle_fini(&session_output); + chunk_status = lttng_trace_chunk_set_as_owner(chunk, session_output); + lttng_directory_handle_put(session_output); + session_output = NULL; if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { reply_code = LTTNG_ERR_UNK; ret = -1; @@ -2416,7 +2554,7 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->id, chunk); if (!published_chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, @@ -2459,6 +2597,7 @@ end: end_no_reply: lttng_trace_chunk_put(chunk); lttng_trace_chunk_put(published_chunk); + lttng_directory_handle_put(session_output); return ret; } @@ -2469,7 +2608,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn, const struct lttng_buffer_view *payload) { - int ret = 0; + int ret = 0, buf_ret; ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_close_trace_chunk *msg; @@ -2522,7 +2661,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->id, chunk_id); if (!chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, @@ -2630,17 +2769,17 @@ end_unlock_session: end: reply.generic.ret_code = htobe32((uint32_t) reply_code); reply.path_length = htobe32((uint32_t) path_length); - ret = lttng_dynamic_buffer_append( + buf_ret = lttng_dynamic_buffer_append( &reply_payload, &reply, sizeof(reply)); - if (ret) { + if (buf_ret) { ERR("Failed to append \"close trace chunk\" command reply header to payload buffer"); goto end_no_reply; } if (reply_code == LTTNG_OK) { - ret = lttng_dynamic_buffer_append(&reply_payload, + buf_ret = lttng_dynamic_buffer_append(&reply_payload, closed_trace_chunk_path, path_length); - if (ret) { + if (buf_ret) { ERR("Failed to append \"close trace chunk\" command reply path to payload buffer"); goto end_no_reply; } @@ -2675,8 +2814,8 @@ static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_trace_chunk_exists *msg; struct lttcomm_relayd_trace_chunk_exists_reply reply = {}; struct lttng_buffer_view header_view; - struct lttng_trace_chunk *chunk = NULL; uint64_t chunk_id; + bool chunk_exists; if (!session || !conn->version_check_done) { ERR("Trying to close a trace chunk before version check"); @@ -2701,25 +2840,80 @@ static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, msg = (typeof(msg)) header_view.data; chunk_id = be64toh(msg->chunk_id); - chunk = sessiond_trace_chunk_registry_get_chunk( + ret = sessiond_trace_chunk_registry_chunk_exists( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, conn->session->id, - chunk_id); - - reply = (typeof(reply)) { - .generic.ret_code = htobe32((uint32_t) LTTNG_OK), - .trace_chunk_exists = !!chunk, + chunk_id, &chunk_exists); + /* + * If ret is not 0, send the reply and report the error to the caller. + * It is a protocol (or internal) error and the session/connection + * should be torn down. + */ + reply = (typeof(reply)){ + .generic.ret_code = htobe32((uint32_t) + (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)), + .trace_chunk_exists = ret == 0 ? chunk_exists : 0, }; - send_ret = conn->sock->ops->sendmsg(conn->sock, - &reply, sizeof(reply), 0); + send_ret = conn->sock->ops->sendmsg( + conn->sock, &reply, sizeof(reply), 0); if (send_ret < (ssize_t) sizeof(reply)) { ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", send_ret); ret = -1; } end_no_reply: - lttng_trace_chunk_put(chunk); + return ret; +} + +/* + * relay_get_configuration: query whether feature is available + */ +static int relay_get_configuration(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct lttcomm_relayd_get_configuration *msg; + struct lttcomm_relayd_get_configuration_reply reply = {}; + struct lttng_buffer_view header_view; + uint64_t query_flags = 0; + uint64_t result_flags = 0; + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk close command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + query_flags = be64toh(msg->query_flags); + + if (query_flags) { + ret = LTTNG_ERR_INVALID_PROTOCOL; + goto reply; + } + if (opt_allow_clear) { + result_flags |= LTTCOMM_RELAYD_CONFIGURATION_FLAG_CLEAR_ALLOWED; + } + ret = 0; +reply: + reply = (typeof(reply)){ + .generic.ret_code = htobe32((uint32_t) + (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)), + .relayd_configuration_flags = htobe64(result_flags), + }; + send_ret = conn->sock->ops->sendmsg( + conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"get configuration\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: return ret; } @@ -2801,6 +2995,10 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn); ret = relay_trace_chunk_exists(header, conn, payload); break; + case RELAYD_GET_CONFIGURATION: + DBG_CMD("RELAYD_GET_CONFIGURATION", conn); + ret = relay_get_configuration(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd); @@ -3400,8 +3598,13 @@ restart: if (ret < 0) { goto error; } - lttng_poll_add(&events, conn->sock->fd, + ret = lttng_poll_add(&events, + conn->sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret) { + ERR("Failed to add new connection file descriptor to poll set"); + goto error; + } connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { @@ -3645,7 +3848,17 @@ int main(int argc, char **argv) int ret = 0, retval = 0; void *status; - /* Parse arguments */ + /* Parse environment variables */ + ret = parse_env_options(); + if (ret) { + retval = -1; + goto exit_options; + } + + /* + * Parse arguments. + * Command line arguments overwrite environment. + */ progname = argv[0]; if (set_options(argc, argv)) { retval = -1; @@ -3657,6 +3870,22 @@ int main(int argc, char **argv) goto exit_options; } + relayd_config_log(); + + if (opt_print_version) { + print_version(); + retval = 0; + goto exit_options; + } + + ret = fclose(stdin); + if (ret) { + PERROR("Failed to close stdin"); + goto exit_options; + } + + DBG("Clear command %s", opt_allow_clear ? "allowed" : "disallowed"); + /* Try to create directory if -o, --output is specified. */ if (opt_output_path) { if (*opt_output_path != '/') { @@ -3695,6 +3924,14 @@ int main(int argc, char **argv) } } + if (opt_working_directory) { + ret = utils_change_working_directory(opt_working_directory); + if (ret) { + /* All errors are already logged. */ + goto exit_options; + } + } + sessiond_trace_chunk_registry = sessiond_trace_chunk_registry_create(); if (!sessiond_trace_chunk_registry) { ERR("Failed to initialize session daemon trace chunk registry");