From 882093eef6fdd658833928a62be5d42fc0cdcb00 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 14 Jul 2020 14:56:37 -0400 Subject: [PATCH] sessiond: notification: use lttng_payload for communications MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Allows passing of fds related to object (e.g userspace probes). Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau Change-Id: I7bb3a91c71016b2939b0e05aca60d57c2da14a20 --- include/lttng/notification/channel-internal.h | 6 +- .../notification-thread-events.c | 211 ++++++++++++++---- .../notification-thread-internal.h | 7 +- src/lib/lttng-ctl/channel.c | 85 ++++--- 4 files changed, 231 insertions(+), 78 deletions(-) diff --git a/include/lttng/notification/channel-internal.h b/include/lttng/notification/channel-internal.h index f7df8e520..befdca083 100644 --- a/include/lttng/notification/channel-internal.h +++ b/include/lttng/notification/channel-internal.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include #include @@ -44,6 +44,8 @@ struct lttng_notification_channel_message { int8_t type; /* Size of the payload following this field. */ uint32_t size; + /* Number of FDs sent. */ + uint32_t fds; char payload[]; } LTTNG_PACKED; @@ -88,7 +90,7 @@ struct lttng_notification_channel { /* List of struct pending_notification. */ struct cds_list_head list; } pending_notifications; - struct lttng_dynamic_buffer reception_buffer; + struct lttng_payload reception_payload; /* Sessiond notification protocol version. */ struct { bool set; diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index ff810dece..86a114c07 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -1184,8 +1184,8 @@ void notification_client_destroy(struct notification_client *client, client->socket = -1; } client->communication.active = false; - lttng_dynamic_buffer_reset(&client->communication.inbound.buffer); - lttng_dynamic_buffer_reset(&client->communication.outbound.buffer); + lttng_payload_reset(&client->communication.inbound.payload); + lttng_payload_reset(&client->communication.outbound.payload); pthread_mutex_destroy(&client->lock); call_rcu(&client->rcu_node, free_notification_client_rcu); } @@ -2582,9 +2582,8 @@ int client_reset_inbound_state(struct notification_client *client) { int ret; - ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, 0); - assert(!ret); + + lttng_payload_clear(&client->communication.inbound.payload); client->communication.inbound.bytes_to_receive = sizeof(struct lttng_notification_channel_message); @@ -2593,8 +2592,9 @@ int client_reset_inbound_state(struct notification_client *client) LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1); LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1); ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, + &client->communication.inbound.payload.buffer, client->communication.inbound.bytes_to_receive); + return ret; } @@ -2616,8 +2616,8 @@ int handle_notification_thread_client_connect( pthread_mutex_init(&client->lock, NULL); client->id = state->next_notification_client_id++; CDS_INIT_LIST_HEAD(&client->condition_list); - lttng_dynamic_buffer_init(&client->communication.inbound.buffer); - lttng_dynamic_buffer_init(&client->communication.outbound.buffer); + lttng_payload_init(&client->communication.inbound.payload); + lttng_payload_init(&client->communication.outbound.payload); client->communication.inbound.expect_creds = true; ret = client_reset_inbound_state(client); @@ -2835,6 +2835,10 @@ enum client_transmission_status client_flush_outgoing_queue( ssize_t ret; size_t to_send_count; enum client_transmission_status status; + struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const int fds_to_send_count = + lttng_payload_view_get_fd_handle_count(&pv); ASSERT_LOCKED(client->lock); @@ -2843,55 +2847,121 @@ enum client_transmission_status client_flush_outgoing_queue( goto end; } - assert(client->communication.outbound.buffer.size != 0); - to_send_count = client->communication.outbound.buffer.size; + if (pv.buffer.size == 0) { + /* + * If both data and fds are equal to zero, we are in an invalid + * state. + */ + assert(fds_to_send_count != 0); + goto send_fds; + } + + /* Send data. */ + to_send_count = pv.buffer.size; DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue", client->socket); ret = lttcomm_send_unix_sock_non_block(client->socket, - client->communication.outbound.buffer.data, + pv.buffer.data, to_send_count); if ((ret >= 0 && ret < to_send_count)) { DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed", client->socket); to_send_count -= max(ret, 0); - memcpy(client->communication.outbound.buffer.data, - client->communication.outbound.buffer.data + - client->communication.outbound.buffer.size - to_send_count, + memcpy(client->communication.outbound.payload.buffer.data, + pv.buffer.data + + pv.buffer.size - to_send_count, to_send_count); ret = lttng_dynamic_buffer_set_size( - &client->communication.outbound.buffer, + &client->communication.outbound.payload.buffer, to_send_count); if (ret) { goto error; } status = CLIENT_TRANSMISSION_STATUS_QUEUED; + goto end; } else if (ret < 0) { /* Generic error, disable the client's communication. */ ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)", client->socket); client->communication.active = false; status = CLIENT_TRANSMISSION_STATUS_FAIL; + goto end; } else { - /* No error and flushed the queue completely. */ + /* + * No error and flushed the queue completely. + * + * The payload buffer size is used later to + * check if there is notifications queued. So albeit that the + * direct caller knows that the transmission is complete, we + * need to set the buffer size to zero. + */ ret = lttng_dynamic_buffer_set_size( - &client->communication.outbound.buffer, 0); + &client->communication.outbound.payload.buffer, 0); if (ret) { goto error; } + } - client->communication.outbound.queued_command_reply = false; - client->communication.outbound.dropped_notification = false; +send_fds: + /* No fds to send, transmission is complete. */ + if (fds_to_send_count == 0) { status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + goto end; } + + ret = lttcomm_send_payload_view_fds_unix_sock_non_block( + client->socket, &pv); + if (ret < 0) { + /* Generic error, disable the client's communication. */ + ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)", + client->socket); + client->communication.active = false; + status = CLIENT_TRANSMISSION_STATUS_FAIL; + goto end; + } else if (ret == 0) { + /* Nothing could be sent. */ + status = CLIENT_TRANSMISSION_STATUS_QUEUED; + } else { + /* Fd passing is an all or nothing kind of thing. */ + status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + /* + * The payload _fd_array count is used later to + * check if there is notifications queued. So although the + * direct caller knows that the transmission is complete, we + * need to clear the _fd_array for the queuing check. + */ + lttng_dynamic_pointer_array_clear( + &client->communication.outbound.payload + ._fd_handles); + } + end: + if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) { + client->communication.outbound.queued_command_reply = false; + client->communication.outbound.dropped_notification = false; + lttng_payload_clear(&client->communication.outbound.payload); + } + return status; error: return CLIENT_TRANSMISSION_STATUS_ERROR; } +static +bool client_has_outbound_data_left( + const struct notification_client *client) +{ + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const bool has_data = pv.buffer.size != 0; + const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); + + return has_data || has_fds; +} + /* Client lock must _not_ be held by the caller. */ static int client_send_command_reply(struct notification_client *client, @@ -2921,14 +2991,15 @@ int client_send_command_reply(struct notification_client *client, /* Enqueue buffer to outgoing queue and flush it. */ ret = lttng_dynamic_buffer_append( - &client->communication.outbound.buffer, + &client->communication.outbound.payload.buffer, buffer, sizeof(buffer)); if (ret) { goto error_unlock; } transmission_status = client_flush_outgoing_queue(client); - if (client->communication.outbound.buffer.size != 0) { + + if (client_has_outbound_data_left(client)) { /* Queue could not be emptied. */ client->communication.outbound.queued_command_reply = true; } @@ -2959,9 +3030,9 @@ int client_handle_message_unknown(struct notification_client *client, */ const struct lttng_notification_channel_message *msg; - assert(sizeof(*msg) == client->communication.inbound.buffer.size); + assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size); msg = (const struct lttng_notification_channel_message *) - client->communication.inbound.buffer.data; + client->communication.inbound.payload.buffer.data; if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) { @@ -2983,10 +3054,14 @@ int client_handle_message_unknown(struct notification_client *client, } client->communication.inbound.bytes_to_receive = msg->size; + client->communication.inbound.fds_to_receive = msg->fds; client->communication.inbound.msg_type = (enum lttng_notification_channel_message_type) msg->type; ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, msg->size); + &client->communication.inbound.payload.buffer, msg->size); + + /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */ + msg = NULL; end: return ret; } @@ -3015,7 +3090,7 @@ int client_handle_message_handshake(struct notification_client *client, handshake_client = (struct lttng_notification_channel_command_handshake *) - client->communication.inbound.buffer + client->communication.inbound.payload.buffer .data; client->major = handshake_client->major; client->minor = handshake_client->minor; @@ -3041,7 +3116,7 @@ int client_handle_message_handshake(struct notification_client *client, pthread_mutex_lock(&client->lock); /* Outgoing queue will be flushed when the command reply is sent. */ ret = lttng_dynamic_buffer_append( - &client->communication.outbound.buffer, send_buffer, + &client->communication.outbound.payload.buffer, send_buffer, sizeof(send_buffer)); if (ret) { ERR("[notification-thread] Failed to send protocol version to notification channel client"); @@ -3084,8 +3159,8 @@ int client_handle_message_subscription( enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; struct lttng_payload_view condition_view = - lttng_payload_view_from_dynamic_buffer( - &client->communication.inbound.buffer, + lttng_payload_view_from_payload( + &client->communication.inbound.payload, 0, -1); size_t expected_condition_size; @@ -3094,7 +3169,7 @@ int client_handle_message_subscription( * other thread accessing clients (action executor) only uses the * outbound state. */ - expected_condition_size = client->communication.inbound.buffer.size; + expected_condition_size = client->communication.inbound.payload.buffer.size; ret = lttng_condition_create_from_payload(&condition_view, &condition); if (ret != expected_condition_size) { ERR("[notification-thread] Malformed condition received from client"); @@ -3179,7 +3254,6 @@ int handle_notification_thread_client_in( struct notification_client *client; ssize_t recv_ret; size_t offset; - bool message_is_complete = false; rcu_read_lock(); client = get_client_from_socket(socket, state); @@ -3189,11 +3263,11 @@ int handle_notification_thread_client_in( goto end; } - offset = client->communication.inbound.buffer.size - + offset = client->communication.inbound.payload.buffer.size - client->communication.inbound.bytes_to_receive; if (client->communication.inbound.expect_creds) { recv_ret = lttcomm_recv_creds_unix_sock(socket, - client->communication.inbound.buffer.data + offset, + client->communication.inbound.payload.buffer.data + offset, client->communication.inbound.bytes_to_receive, &client->communication.inbound.creds); if (recv_ret > 0) { @@ -3202,32 +3276,66 @@ int handle_notification_thread_client_in( } } else { recv_ret = lttcomm_recv_unix_sock_non_block(socket, - client->communication.inbound.buffer.data + offset, + client->communication.inbound.payload.buffer.data + offset, client->communication.inbound.bytes_to_receive); } if (recv_ret >= 0) { client->communication.inbound.bytes_to_receive -= recv_ret; - message_is_complete = client->communication.inbound - .bytes_to_receive == 0; + } else { + goto error_disconnect_client; } - if (recv_ret < 0) { - goto error_disconnect_client; + if (client->communication.inbound.bytes_to_receive != 0) { + /* Message incomplete wait for more data. */ + ret = 0; + goto end; } - if (message_is_complete) { - ret = client_dispatch_message(client, state); - if (ret) { + assert(client->communication.inbound.bytes_to_receive == 0); + + /* Receive fds. */ + if (client->communication.inbound.fds_to_receive != 0) { + ret = lttcomm_recv_payload_fds_unix_sock_non_block( + client->socket, + client->communication.inbound.fds_to_receive, + &client->communication.inbound.payload); + if (ret > 0) { /* - * Only returns an error if this client must be - * disconnected. + * Fds received. non blocking fds passing is all + * or nothing. */ + ssize_t expected_size; + + expected_size = sizeof(int) * + client->communication.inbound + .fds_to_receive; + assert(ret == expected_size); + client->communication.inbound.fds_to_receive = 0; + } else if (ret == 0) { + /* Received nothing. */ + ret = 0; + goto end; + } else { goto error_disconnect_client; } } + + /* At this point the message is complete.*/ + assert(client->communication.inbound.bytes_to_receive == 0 && + client->communication.inbound.fds_to_receive == 0); + ret = client_dispatch_message(client, state); + if (ret) { + /* + * Only returns an error if this client must be + * disconnected. + */ + goto error_disconnect_client; + } + end: rcu_read_unlock(); return ret; + error_disconnect_client: ret = notification_thread_client_disconnect(client, state); goto end; @@ -3448,7 +3556,7 @@ int client_notification_overflow(struct notification_client *client) client->communication.outbound.dropped_notification = true; ret = lttng_dynamic_buffer_append( - &client->communication.outbound.buffer, &msg, + &client->communication.outbound.payload.buffer, &msg, sizeof(msg)); if (ret) { PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue", @@ -3556,6 +3664,16 @@ int notification_client_list_send_evaluation( ->size = (uint32_t)( msg_payload.buffer.size - sizeof(msg_header)); + /* Update the payload number of fds. */ + { + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &msg_payload, 0, -1); + + ((struct lttng_notification_channel_message *) + msg_payload.buffer.data)->fds = (uint32_t) + lttng_payload_view_get_fd_handle_count(&pv); + } + pthread_mutex_lock(&client_list->lock); cds_list_for_each_entry_safe(client_list_element, tmp, &client_list->list, node) { @@ -3594,7 +3712,8 @@ int notification_client_list_send_evaluation( DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)", client->socket, msg_payload.buffer.size); - if (client->communication.outbound.buffer.size) { + + if (client_has_outbound_data_left(client)) { /* * Outgoing data is already buffered for this client; * drop the notification and enqueue a "dropped @@ -3609,9 +3728,7 @@ int notification_client_list_send_evaluation( } } - ret = lttng_dynamic_buffer_append_buffer( - &client->communication.outbound.buffer, - &msg_payload.buffer); + ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload); if (ret) { /* Fatal error. */ goto skip_client; diff --git a/src/bin/lttng-sessiond/notification-thread-internal.h b/src/bin/lttng-sessiond/notification-thread-internal.h index e36180fb6..eb23d1f78 100644 --- a/src/bin/lttng-sessiond/notification-thread-internal.h +++ b/src/bin/lttng-sessiond/notification-thread-internal.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -156,9 +157,11 @@ struct notification_client { * buffers' "size" is set to contain the current * message's complete payload. */ - struct lttng_dynamic_buffer buffer; + struct lttng_payload payload; /* Bytes left to receive for the current message. */ size_t bytes_to_receive; + /* FDs left to receive for the current message. */ + int fds_to_receive; /* Type of the message being received. */ enum lttng_notification_channel_message_type msg_type; /* @@ -192,7 +195,7 @@ struct notification_client { * misbehaving/malicious client. */ bool queued_command_reply; - struct lttng_dynamic_buffer buffer; + struct lttng_payload payload; } outbound; } communication; /* call_rcu delayed reclaim. */ diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c index 16bd298be..a4e910e1d 100644 --- a/src/lib/lttng-ctl/channel.c +++ b/src/lib/lttng-ctl/channel.c @@ -14,6 +14,9 @@ #include #include #include +#include +#include +#include #include #include "lttng-ctl-helper.h" #include @@ -31,10 +34,7 @@ int receive_message(struct lttng_notification_channel *channel) ssize_t ret; struct lttng_notification_channel_message msg; - if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) { - ret = -1; - goto end; - } + lttng_payload_clear(&channel->reception_payload); ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); if (ret <= 0) { @@ -48,33 +48,41 @@ int receive_message(struct lttng_notification_channel *channel) } /* Add message header at buffer's start. */ - ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg, + ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg, sizeof(msg)); if (ret) { goto error; } /* Reserve space for the payload. */ - ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, - channel->reception_buffer.size + msg.size); + ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer, + channel->reception_payload.buffer.size + msg.size); if (ret) { goto error; } /* Receive message payload. */ ret = lttcomm_recv_unix_sock(channel->socket, - channel->reception_buffer.data + sizeof(msg), msg.size); + channel->reception_payload.buffer.data + sizeof(msg), msg.size); if (ret < (ssize_t) msg.size) { ret = -1; goto error; } + + /* Receive message fds. */ + if (msg.fds != 0) { + ret = lttcomm_recv_payload_fds_unix_sock(channel->socket, + msg.fds, &channel->reception_payload); + if (ret < sizeof(int) * msg.fds) { + ret = -1; + goto error; + } + } ret = 0; end: return ret; error: - if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) { - ret = -1; - } + lttng_payload_clear(&channel->reception_payload); goto end; } @@ -84,10 +92,10 @@ enum lttng_notification_channel_message_type get_current_message_type( { struct lttng_notification_channel_message *msg; - assert(channel->reception_buffer.size >= sizeof(*msg)); + assert(channel->reception_payload.buffer.size >= sizeof(*msg)); msg = (struct lttng_notification_channel_message *) - channel->reception_buffer.data; + channel->reception_payload.buffer.data; return (enum lttng_notification_channel_message_type) msg->type; } @@ -98,14 +106,14 @@ struct lttng_notification *create_notification_from_current_message( ssize_t ret; struct lttng_notification *notification = NULL; - if (channel->reception_buffer.size <= + if (channel->reception_payload.buffer.size <= sizeof(struct lttng_notification_channel_message)) { goto end; } { - struct lttng_payload_view view = lttng_payload_view_from_dynamic_buffer( - &channel->reception_buffer, + struct lttng_payload_view view = lttng_payload_view_from_payload( + &channel->reception_payload, sizeof(struct lttng_notification_channel_message), -1); @@ -113,7 +121,7 @@ struct lttng_notification *create_notification_from_current_message( &view, ¬ification); } - if (ret != channel->reception_buffer.size - + if (ret != channel->reception_payload.buffer.size - sizeof(struct lttng_notification_channel_message)) { lttng_notification_destroy(notification); notification = NULL; @@ -147,7 +155,7 @@ struct lttng_notification_channel *lttng_notification_channel_create( } channel->socket = -1; pthread_mutex_init(&channel->lock, NULL); - lttng_dynamic_buffer_init(&channel->reception_buffer); + lttng_payload_init(&channel->reception_payload); CDS_INIT_LIST_HEAD(&channel->pending_notifications.list); is_root = (getuid() == 0); @@ -520,7 +528,7 @@ int receive_command_reply(struct lttng_notification_channel *channel, struct lttng_notification_channel_command_handshake *handshake; handshake = (struct lttng_notification_channel_command_handshake *) - (channel->reception_buffer.data + + (channel->reception_payload.buffer.data + sizeof(struct lttng_notification_channel_message)); channel->version.major = handshake->major; channel->version.minor = handshake->minor; @@ -534,7 +542,7 @@ int receive_command_reply(struct lttng_notification_channel *channel, } exit_loop: - if (channel->reception_buffer.size < + if (channel->reception_payload.buffer.size < (sizeof(struct lttng_notification_channel_message) + sizeof(*reply))) { /* Invalid message received. */ @@ -543,7 +551,7 @@ exit_loop: } reply = (struct lttng_notification_channel_command_reply *) - (channel->reception_buffer.data + + (channel->reception_payload.buffer.data + sizeof(struct lttng_notification_channel_message)); *status = (enum lttng_notification_channel_status) reply->status; end: @@ -625,6 +633,7 @@ enum lttng_notification_channel_status send_condition_command( pthread_mutex_lock(&channel->lock); socket = channel->socket; + if (!lttng_condition_validate(condition)) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end_unlock; @@ -647,11 +656,33 @@ enum lttng_notification_channel_status send_condition_command( ((struct lttng_notification_channel_message *) payload.buffer.data)->size = (uint32_t) (payload.buffer.size - sizeof(cmd_header)); - ret = lttcomm_send_unix_sock( - socket, payload.buffer.data, payload.buffer.size); - if (ret < 0) { - status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + { + struct lttng_payload_view pv = + lttng_payload_view_from_payload( + &payload, 0, -1); + const int fd_count = + lttng_payload_view_get_fd_handle_count(&pv); + + /* Update fd count. */ + ((struct lttng_notification_channel_message *) payload.buffer.data)->fds = + (uint32_t) fd_count; + + ret = lttcomm_send_unix_sock( + socket, pv.buffer.data, pv.buffer.size); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + + /* Pass fds if present. */ + if (fd_count > 0) { + ret = lttcomm_send_payload_view_fds_unix_sock(socket, + &pv); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + } } ret = receive_command_reply(channel, &status); @@ -695,6 +726,6 @@ void lttng_notification_channel_destroy( (void) lttcomm_close_unix_sock(channel->socket); } pthread_mutex_destroy(&channel->lock); - lttng_dynamic_buffer_reset(&channel->reception_buffer); + lttng_payload_reset(&channel->reception_payload); free(channel); } -- 2.34.1