X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.c;h=793499c4eebb40218ceb1ac651d18b59a520edbe;hp=00a2bf66f559f1d5b8d6ee2c019e3dd0c2241534;hb=6e21424e11f47edb885c7f92ae1cae69eee0ed8e;hpb=8f56701fa4b73a75378e88a84b0500e18b51ff49 diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index 00a2bf66f..793499c4e 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -40,6 +40,7 @@ #include #include #include +#include #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT) @@ -96,6 +97,11 @@ struct notification_client { struct cds_lfht_node client_socket_ht_node; struct { struct { + /* + * During the reception of a message, the reception + * buffers' "size" is set to contain the current + * message's complete payload. + */ struct lttng_dynamic_buffer buffer; /* Bytes left to receive for the current message. */ size_t bytes_to_receive; @@ -105,12 +111,13 @@ struct notification_client { * Indicates whether or not credentials are expected * from the client. */ - bool receive_creds; + bool expect_creds; /* * Indicates whether or not credentials were received * from the client. */ bool creds_received; + /* Only used during credentials reception. */ lttng_sock_cred creds; } inbound; struct { @@ -405,6 +412,7 @@ int notification_thread_client_subscribe(struct notification_client *client, &iter); node = cds_lfht_iter_get_node(&iter); if (!node) { + free(client_list_element); goto end_unlock; } @@ -956,6 +964,7 @@ error: return ret; } +static int handle_notification_thread_command_unregister_trigger( struct notification_thread_state *state, struct lttng_trigger *trigger, @@ -1107,12 +1116,12 @@ int handle_notification_thread_command( } end: cds_list_del(&cmd->cmd_list_node); - futex_nto1_wake(&cmd->reply_futex); + lttng_waiter_wake_up(&cmd->reply_waiter); pthread_mutex_unlock(&handle->cmd_queue.lock); return ret; error_unlock: /* Wake-up and return a fatal error to the calling thread. */ - futex_nto1_wake(&cmd->reply_futex); + lttng_waiter_wake_up(&cmd->reply_waiter); pthread_mutex_unlock(&handle->cmd_queue.lock); cmd->reply_code = LTTNG_ERR_FATAL; error: @@ -1150,7 +1159,7 @@ end: } static -void client_reset_inbound_state(struct notification_client *client) +int client_reset_inbound_state(struct notification_client *client) { int ret; @@ -1162,9 +1171,12 @@ void client_reset_inbound_state(struct notification_client *client) sizeof(struct lttng_notification_channel_message); client->communication.inbound.msg_type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN; - client->communication.inbound.receive_creds = false; LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1); LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1); + ret = lttng_dynamic_buffer_set_size( + &client->communication.inbound.buffer, + client->communication.inbound.bytes_to_receive); + return ret; } int handle_notification_thread_client_connect( @@ -1184,7 +1196,13 @@ int handle_notification_thread_client_connect( CDS_INIT_LIST_HEAD(&client->condition_list); lttng_dynamic_buffer_init(&client->communication.inbound.buffer); lttng_dynamic_buffer_init(&client->communication.outbound.buffer); - client_reset_inbound_state(client); + client->communication.inbound.expect_creds = true; + ret = client_reset_inbound_state(client); + if (ret) { + ERR("[notification-thread] Failed to reset client communication's inbound state"); + ret = 0; + goto error; + } ret = lttcomm_accept_unix_sock(state->notification_channel_socket); if (ret < 0) { @@ -1219,7 +1237,6 @@ int handle_notification_thread_client_connect( DBG("[notification-thread] Added new notification channel client socket (%i) to poll set", client->socket); - /* Add to ht. */ rcu_read_lock(); cds_lfht_add(state->client_socket_ht, hash_client_socket(client->socket), @@ -1474,12 +1491,8 @@ int client_dispatch_message(struct notification_client *client, client->communication.inbound.bytes_to_receive = msg->size; client->communication.inbound.msg_type = (enum lttng_notification_channel_message_type) msg->type; - if (client->communication.inbound.msg_type == - LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE) { - client->communication.inbound.receive_creds = true; - } ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, 0); + &client->communication.inbound.buffer, msg->size); if (ret) { goto end; } @@ -1507,7 +1520,7 @@ int client_dispatch_message(struct notification_client *client, handshake_client = (struct lttng_notification_channel_command_handshake *) client->communication.inbound.buffer.data; - client->major = handshake_client->major; + client->major = handshake_client->major; client->minor = handshake_client->minor; if (!client->communication.inbound.creds_received) { ERR("[notification-thread] No credentials received from client"); @@ -1546,7 +1559,11 @@ int client_dispatch_message(struct notification_client *client, } /* Set reception state to receive the next message header. */ - client_reset_inbound_state(client); + ret = client_reset_inbound_state(client); + if (ret) { + ERR("[notification-thread] Failed to reset client communication's inbound state"); + goto end; + } client->validated = true; break; } @@ -1593,7 +1610,11 @@ int client_dispatch_message(struct notification_client *client, } /* Set reception state to receive the next message header. */ - client_reset_inbound_state(client); + ret = client_reset_inbound_state(client); + if (ret) { + ERR("[notification-thread] Failed to reset client communication's inbound state"); + goto end; + } break; } default: @@ -1607,7 +1628,7 @@ end: int handle_notification_thread_client_in( struct notification_thread_state *state, int socket) { - int ret; + int ret = 0; struct notification_client *client; ssize_t recv_ret; size_t offset; @@ -1619,21 +1640,15 @@ int handle_notification_thread_client_in( goto end; } - offset = client->communication.inbound.buffer.size; - ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, - client->communication.inbound.bytes_to_receive); - if (ret) { - goto end; - } - - if (client->communication.inbound.receive_creds) { + offset = client->communication.inbound.buffer.size - + client->communication.inbound.bytes_to_receive; + if (client->communication.inbound.expect_creds) { recv_ret = lttcomm_recv_creds_unix_sock(socket, client->communication.inbound.buffer.data + offset, client->communication.inbound.bytes_to_receive, &client->communication.inbound.creds); if (recv_ret > 0) { - client->communication.inbound.receive_creds = false; + client->communication.inbound.expect_creds = false; client->communication.inbound.creds_received = true; } } else { @@ -1646,14 +1661,6 @@ int handle_notification_thread_client_in( } client->communication.inbound.bytes_to_receive -= recv_ret; - ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, - client->communication.inbound.buffer.size - - client->communication.inbound.bytes_to_receive); - if (ret) { - goto end; - } - if (client->communication.inbound.bytes_to_receive == 0) { ret = client_dispatch_message(client, state); if (ret) { @@ -1948,9 +1955,7 @@ int handle_notification_thread_channel_sample( * The monitoring pipe only holds messages smaller than PIPE_BUF, * ensuring that read/write of sampling messages are atomic. */ - do { - ret = read(pipe, &sample_msg, sizeof(sample_msg)); - } while (ret == -1 && errno == EINTR); + ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg)); if (ret != sizeof(sample_msg)) { ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)", pipe);