X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.c;h=6ba6c415b65b74bf2c6571fb6261d6f3f3a34f80;hp=285bcf951aafe3a5bb2801777e7a13eb06dc5ec7;hb=ae0a823f9f7e1d3800479488a58efc2f92f27d89;hpb=505b2d90aa87592186ecc2a119cf67fb3f90d168 diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index 285bcf951..6ba6c415b 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -2412,6 +2412,8 @@ int handle_notification_thread_command( pthread_mutex_lock(&handle->cmd_queue.lock); cmd = cds_list_first_entry(&handle->cmd_queue.list, struct notification_thread_command, cmd_list_node); + cds_list_del(&cmd->cmd_list_node); + pthread_mutex_unlock(&handle->cmd_queue.lock); switch (cmd->type) { case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER: DBG("[notification-thread] Received register trigger command"); @@ -2474,14 +2476,16 @@ int handle_notification_thread_command( goto error_unlock; } end: - cds_list_del(&cmd->cmd_list_node); - lttng_waiter_wake_up(&cmd->reply_waiter); - pthread_mutex_unlock(&handle->cmd_queue.lock); + if (cmd->is_async) { + free(cmd); + cmd = NULL; + } else { + lttng_waiter_wake_up(&cmd->reply_waiter); + } return ret; error_unlock: /* Wake-up and return a fatal error to the calling thread. */ lttng_waiter_wake_up(&cmd->reply_waiter); - pthread_mutex_unlock(&handle->cmd_queue.lock); cmd->reply_code = LTTNG_ERR_FATAL; error: /* Indicate a fatal error to the caller. */ @@ -2720,13 +2724,61 @@ int handle_notification_thread_trigger_unregister_all( return error_occurred ? -1 : 0; } +static +int client_handle_transmission_status( + struct notification_client *client, + enum client_transmission_status transmission_status, + struct notification_thread_state *state) +{ + int ret = 0; + + switch (transmission_status) { + case CLIENT_TRANSMISSION_STATUS_COMPLETE: + ret = lttng_poll_mod(&state->events, client->socket, + CLIENT_POLL_MASK_IN); + if (ret) { + goto end; + } + + client->communication.outbound.queued_command_reply = false; + client->communication.outbound.dropped_notification = false; + break; + case CLIENT_TRANSMISSION_STATUS_QUEUED: + /* + * We want to be notified whenever there is buffer space + * available to send the rest of the payload. + */ + ret = lttng_poll_mod(&state->events, client->socket, + CLIENT_POLL_MASK_IN_OUT); + if (ret) { + goto end; + } + break; + case CLIENT_TRANSMISSION_STATUS_FAIL: + ret = notification_thread_client_disconnect(client, state); + if (ret) { + goto end; + } + break; + case CLIENT_TRANSMISSION_STATUS_ERROR: + ret = -1; + goto end; + default: + abort(); + } +end: + return ret; +} + /* Client lock must be acquired by caller. */ static -int client_flush_outgoing_queue(struct notification_client *client, +enum client_transmission_status client_flush_outgoing_queue( + struct notification_client *client, struct notification_thread_state *state) { ssize_t ret; size_t to_send_count; + enum client_transmission_status status; ASSERT_LOCKED(client->lock); @@ -2751,41 +2803,29 @@ int client_flush_outgoing_queue(struct notification_client *client, &client->communication.outbound.buffer, to_send_count); if (ret) { + status = CLIENT_TRANSMISSION_STATUS_ERROR; goto error; } - - /* - * We want to be notified whenever there is buffer space - * available to send the rest of the payload. - */ - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN_OUT); - if (ret) { - goto error; - } + status = CLIENT_TRANSMISSION_STATUS_QUEUED; } else if (ret < 0) { /* Generic error, disconnect the client. */ - ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)", + ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)", client->socket); - ret = notification_thread_client_disconnect(client, state); - if (ret) { - goto error; - } + status = CLIENT_TRANSMISSION_STATUS_FAIL; } else { /* No error and flushed the queue completely. */ ret = lttng_dynamic_buffer_set_size( &client->communication.outbound.buffer, 0); if (ret) { + status = CLIENT_TRANSMISSION_STATUS_ERROR; goto error; } - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN); - if (ret) { - goto error; - } + status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + } - client->communication.outbound.queued_command_reply = false; - client->communication.outbound.dropped_notification = false; + ret = client_handle_transmission_status(client, status, state); + if (ret) { + goto error; } return 0;