X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Flib%2Flttng-ctl%2Fchannel.c;h=16bd298be330e4dfcbc274326c22825669fd5346;hp=16474464d880a9ffaa6c0fbad6c068dbeb781407;hb=ae0a823f9f7e1d3800479488a58efc2f92f27d89;hpb=1d757b1cd3b4669b52e2d9ceafb03eafd42490ff diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c index 16474464d..16bd298be 100644 --- a/src/lib/lttng-ctl/channel.c +++ b/src/lib/lttng-ctl/channel.c @@ -1,18 +1,8 @@ /* - * Copyright (C) 2017 - Jérémie Galarneau + * Copyright (C) 2017 Jérémie Galarneau * - * This library is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License, version 2.1 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: LGPL-2.1-only * - * This library is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License - * for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this library; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include @@ -26,8 +16,7 @@ #include #include #include "lttng-ctl-helper.h" -#include -#include +#include static int handshake(struct lttng_notification_channel *channel); @@ -108,17 +97,22 @@ struct lttng_notification *create_notification_from_current_message( { ssize_t ret; struct lttng_notification *notification = NULL; - struct lttng_buffer_view view; if (channel->reception_buffer.size <= sizeof(struct lttng_notification_channel_message)) { goto end; } - view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer, - sizeof(struct lttng_notification_channel_message), -1); + { + struct lttng_payload_view view = lttng_payload_view_from_dynamic_buffer( + &channel->reception_buffer, + sizeof(struct lttng_notification_channel_message), + -1); + + ret = lttng_notification_create_from_payload( + &view, ¬ification); + } - ret = lttng_notification_create_from_buffer(&view, ¬ification); if (ret != channel->reception_buffer.size - sizeof(struct lttng_notification_channel_message)) { lttng_notification_destroy(notification); @@ -211,6 +205,7 @@ lttng_notification_channel_get_next_notification( struct lttng_notification *notification = NULL; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + struct lttng_poll_event events; if (!channel || !_notification) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; @@ -239,10 +234,40 @@ lttng_notification_channel_get_next_notification( goto end_unlock; } + /* + * Block on interruptible epoll/poll() instead of the message reception + * itself as the recvmsg() wrappers always restart on EINTR. We choose + * to wait using interruptible epoll/poll() in order to: + * 1) Return if a signal occurs, + * 2) Not deal with partially received messages. + * + * The drawback to this approach is that we assume that messages + * are complete/well formed. If a message is shorter than its + * announced length, receive_message() will block on recvmsg() + * and never return (even if a signal is received). + */ + ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + ret = lttng_poll_wait_interruptible(&events, -1); + if (ret <= 0) { + status = (ret == -1 && errno == EINTR) ? + LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED : + LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + ret = receive_message(channel); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } switch (get_current_message_type(channel)) { @@ -251,7 +276,7 @@ lttng_notification_channel_get_next_notification( channel); if (!notification) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } break; case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: @@ -261,15 +286,15 @@ lttng_notification_channel_get_next_notification( default: /* Protocol error. */ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } +end_clean_poll: + lttng_poll_clean(&events); end_unlock: pthread_mutex_unlock(&channel->lock); + *_notification = notification; end: - if (_notification) { - *_notification = notification; - } return status; } @@ -366,11 +391,7 @@ lttng_notification_channel_has_pending_notification( int ret; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; - fd_set read_fds; - struct timeval timeout; - - FD_ZERO(&read_fds); - memset(&timeout, 0, sizeof(timeout)); + struct lttng_poll_event events; if (!channel || !_notification_pending) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; @@ -405,48 +426,57 @@ lttng_notification_channel_has_pending_notification( * message if we see data available on the socket. If the peer does * not respect the protocol, this may block indefinitely. */ - FD_SET(channel->socket, &read_fds); - do { - ret = select(channel->socket + 1, &read_fds, NULL, NULL, &timeout); - } while (ret < 0 && errno == EINTR); - + ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + /* timeout = 0: return immediately. */ + ret = lttng_poll_wait_interruptible(&events, 0); if (ret == 0) { /* No data available. */ *_notification_pending = false; - goto end_unlock; + goto end_clean_poll; } else if (ret < 0) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } /* Data available on socket. */ ret = receive_message(channel); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } switch (get_current_message_type(channel)) { case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: ret = enqueue_notification_from_current_message(channel); if (ret) { - goto end; + goto end_clean_poll; } *_notification_pending = true; break; case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: ret = enqueue_dropped_notification(channel); if (ret) { - goto end; + goto end_clean_poll; } *_notification_pending = true; break; default: /* Protocol error. */ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } +end_clean_poll: + lttng_poll_clean(&events); end_unlock: pthread_mutex_unlock(&channel->lock); end: @@ -575,14 +605,16 @@ enum lttng_notification_channel_status send_condition_command( const struct lttng_condition *condition) { int socket; - ssize_t command_size, ret; + ssize_t ret; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; - char *command_buffer = NULL; - struct lttng_notification_channel_message cmd_message = { - .type = type, + struct lttng_payload payload; + struct lttng_notification_channel_message cmd_header = { + .type = (int8_t) type, }; + lttng_payload_init(&payload); + if (!channel) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end; @@ -598,28 +630,25 @@ enum lttng_notification_channel_status send_condition_command( goto end_unlock; } - ret = lttng_condition_serialize(condition, NULL); - if (ret < 0) { - status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; - goto end_unlock; - } - assert(ret < UINT32_MAX); - cmd_message.size = (uint32_t) ret; - command_size = ret + sizeof( - struct lttng_notification_channel_message); - command_buffer = zmalloc(command_size); - if (!command_buffer) { + ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header, + sizeof(cmd_header)); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; goto end_unlock; } - memcpy(command_buffer, &cmd_message, sizeof(cmd_message)); - ret = lttng_condition_serialize(condition, - command_buffer + sizeof(cmd_message)); - if (ret < 0) { + ret = lttng_condition_serialize(condition, &payload); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end_unlock; } - ret = lttcomm_send_unix_sock(socket, command_buffer, command_size); + /* Update payload length. */ + ((struct lttng_notification_channel_message *) payload.buffer.data)->size = + (uint32_t) (payload.buffer.size - sizeof(cmd_header)); + + ret = lttcomm_send_unix_sock( + socket, payload.buffer.data, payload.buffer.size); if (ret < 0) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; goto end_unlock; @@ -633,7 +662,7 @@ enum lttng_notification_channel_status send_condition_command( end_unlock: pthread_mutex_unlock(&channel->lock); end: - free(command_buffer); + lttng_payload_reset(&payload); return status; }