X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=b5ae34ebf9c9989bbd9b72417cdcd427ad9d9546;hp=db021f43fc5b28a9ac0de322752880a3c9a171c9;hb=bbc4768c20f1c552222e1746f9475d145d7bf04e;hpb=96b0ecff3ade3c7970d1cc5fb12a50a94c67ee67 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index db021f43f..b5ae34ebf 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1095,18 +1095,15 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, int ret = 0; ssize_t send_ret; struct relay_session *session = NULL; - struct lttcomm_relayd_status_session reply; - char session_name[LTTNG_NAME_MAX]; - char hostname[LTTNG_HOST_NAME_MAX]; + struct lttcomm_relayd_status_session reply = {}; + char session_name[LTTNG_NAME_MAX] = {}; + char hostname[LTTNG_HOST_NAME_MAX] = {}; uint32_t live_timer = 0; bool snapshot = false; /* Left nil for peers < 2.11. */ lttng_uuid sessiond_uuid = {}; - - memset(session_name, 0, LTTNG_NAME_MAX); - memset(hostname, 0, LTTNG_HOST_NAME_MAX); - - memset(&reply, 0, sizeof(reply)); + LTTNG_OPTIONAL(uint64_t) id_sessiond = {}; + LTTNG_OPTIONAL(uint64_t) current_chunk_id = {}; if (conn->minor < 4) { /* From 2.1 to 2.3 */ @@ -1116,16 +1113,22 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, ret = cmd_create_session_2_4(payload, session_name, hostname, &live_timer, &snapshot); } else { + bool has_current_chunk; + /* From 2.11 to ... */ ret = cmd_create_session_2_11(payload, session_name, hostname, &live_timer, &snapshot, - sessiond_uuid); + &id_sessiond.value, sessiond_uuid, + &has_current_chunk, + ¤t_chunk_id.value); if (lttng_uuid_is_nil(sessiond_uuid)) { /* The nil UUID is reserved for pre-2.11 clients. */ ERR("Illegal nil UUID announced by peer in create session command"); ret = -1; goto send_reply; } + id_sessiond.is_set = true; + current_chunk_id.is_set = has_current_chunk; } if (ret < 0) { @@ -1133,7 +1136,10 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, } session = session_create(session_name, hostname, live_timer, - snapshot, sessiond_uuid, conn->major, conn->minor); + snapshot, sessiond_uuid, + id_sessiond.is_set ? &id_sessiond.value : NULL, + current_chunk_id.is_set ? ¤t_chunk_id.value : NULL, + conn->major, conn->minor); if (!session) { ret = -1; goto send_reply; @@ -1144,15 +1150,6 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, reply.session_id = htobe64(session->id); - session->current_trace_chunk = - sessiond_trace_chunk_registry_get_anonymous_chunk( - sessiond_trace_chunk_registry, sessiond_uuid, - session->id, - opt_output_path); - if (!session->current_trace_chunk) { - ret = -1; - } - send_reply: if (ret < 0) { reply.ret_code = htobe32(LTTNG_ERR_FATAL); @@ -2537,8 +2534,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr size_t path_len; struct lttng_buffer_view new_path_view; - DBG("Rotate stream received"); - if (!session || !conn->version_check_done) { ERR("Trying to rotate a stream before version check"); ret = -1; @@ -2665,6 +2660,300 @@ end_no_reply: return ret; } +static int init_session_output_directory_handle(struct relay_session *session, + struct lttng_directory_handle *handle) +{ + int ret; + /* hostname/session_name */ + char *session_directory = NULL; + /* + * base path + session_directory + * e.g. /home/user/lttng-traces/hostname/session_name + */ + char *full_session_path = NULL; + + pthread_mutex_lock(&session->lock); + ret = asprintf(&session_directory, "%s/%s", session->hostname, + session->session_name); + pthread_mutex_unlock(&session->lock); + if (ret < 0) { + PERROR("Failed to format session directory name"); + goto end; + } + + full_session_path = create_output_path(session_directory); + if (!full_session_path) { + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive( + full_session_path, S_IRWXU | S_IRWXG, -1, -1); + if (ret) { + ERR("Failed to create session output path \"%s\"", + full_session_path); + goto end; + } + + ret = lttng_directory_handle_init(handle, full_session_path); + if (ret) { + goto end; + } +end: + free(session_directory); + free(full_session_path); + return ret; +} + +/* + * relay_create_trace_chunk: create a new trace chunk + */ +static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_create_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_buffer_view chunk_name_view; + struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + struct lttng_directory_handle session_output; + + if (!session || !conn->version_check_done) { + ERR("Trying to create a trace chunk before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Chunk creation command is unsupported before 2.11"); + ret = -1; + goto end_no_reply; + } + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + msg->chunk_id = be64toh(msg->chunk_id); + msg->creation_timestamp = be64toh(msg->creation_timestamp); + msg->override_name_length = be32toh(msg->override_name_length); + + chunk = lttng_trace_chunk_create( + msg->chunk_id, msg->creation_timestamp); + if (!chunk) { + ERR("Failed to create trace chunk in trace chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + if (msg->override_name_length) { + const char *name; + + chunk_name_view = lttng_buffer_view_from_view(payload, + sizeof(*msg), + msg->override_name_length); + name = chunk_name_view.data; + if (!name || name[msg->override_name_length - 1]) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; + } + + chunk_status = lttng_trace_chunk_override_name( + chunk, chunk_name_view.data); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + break; + case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)"); + reply_code = LTTNG_ERR_INVALID; + ret = -1; + goto end; + default: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)"); + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + } + + ret = init_session_output_directory_handle( + conn->session, &session_output); + if (ret) { + reply_code = LTTNG_ERR_CREATE_DIR_FAIL; + goto end; + } + + chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + + chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + + published_chunk = sessiond_trace_chunk_registry_publish_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk); + if (!published_chunk) { + char uuid_str[UUID_STR_LEN]; + + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + pthread_mutex_lock(&conn->session->lock); + lttng_trace_chunk_put(conn->session->current_trace_chunk); + conn->session->current_trace_chunk = published_chunk; + pthread_mutex_unlock(&conn->session->lock); + published_chunk = NULL; + +end: + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(published_chunk); + lttng_directory_handle_fini(&session_output); + return ret; +} + +/* + * relay_close_trace_chunk: close a trace chunk + */ +static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_close_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_trace_chunk *chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + uint64_t chunk_id; + LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command; + time_t close_timestamp; + + if (!session || !conn->version_check_done) { + ERR("Trying to close a trace chunk before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Chunk close command is unsupported before 2.11"); + ret = -1; + goto end_no_reply; + } + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk close command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + chunk_id = be64toh(msg->chunk_id); + close_timestamp = (time_t) be64toh(msg->close_timestamp); + close_command = (typeof(close_command)){ + .value = be32toh(msg->close_command.value), + .is_set = msg->close_command.is_set, + }; + + chunk = sessiond_trace_chunk_registry_get_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk_id); + if (!chunk) { + char uuid_str[UUID_STR_LEN]; + + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + chunk_status = lttng_trace_chunk_set_close_timestamp( + chunk, close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk close timestamp"); + ret = -1; + reply_code = LTTNG_ERR_UNK; + goto end; + } + + if (close_command.is_set) { + chunk_status = lttng_trace_chunk_set_close_command( + chunk, close_command.value); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; + } + } + +end: + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: + lttng_trace_chunk_put(chunk); + return ret; +} + #define DBG_CMD(cmd_name, conn) \ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); @@ -2731,6 +3020,14 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_ROTATE_STREAM", conn); ret = relay_rotate_session_stream(header, conn, payload); break; + case RELAYD_CREATE_TRACE_CHUNK: + DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); + ret = relay_create_trace_chunk(header, conn, payload); + break; + case RELAYD_CLOSE_TRACE_CHUNK: + DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn); + ret = relay_close_trace_chunk(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd);