/*
- * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
- * This library is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License, version 2.1 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: LGPL-2.1-only
*
- * This library is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
- * for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this library; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <lttng/notification/notification-internal.h>
#include <common/defaults.h>
#include <assert.h>
#include "lttng-ctl-helper.h"
+#include <common/compat/poll.h>
static
int handshake(struct lttng_notification_channel *channel);
/*
* Populates the reception buffer with the next complete message.
- * The caller must acquire the client's lock.
+ * The caller must acquire the channel's lock.
*/
static
int receive_message(struct lttng_notification_channel *channel)
ssize_t ret;
struct lttng_notification_channel_message msg;
- ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
- if (ret) {
- goto error;
+ if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
+ ret = -1;
+ goto end;
}
ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
end:
return ret;
error:
- lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
+ if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
+ ret = -1;
+ }
goto end;
}
struct lttng_notification *notification = NULL;
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+ struct lttng_poll_event events;
if (!channel || !_notification) {
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
goto end;
}
+ pthread_mutex_lock(&channel->lock);
+
if (channel->pending_notifications.count) {
struct pending_notification *pending_notification;
cds_list_del(&pending_notification->node);
channel->pending_notifications.count--;
free(pending_notification);
- goto end;
+ goto end_unlock;
}
- pthread_mutex_lock(&channel->lock);
+ /*
+ * Block on interruptible epoll/poll() instead of the message reception
+ * itself as the recvmsg() wrappers always restart on EINTR. We choose
+ * to wait using interruptible epoll/poll() in order to:
+ * 1) Return if a signal occurs,
+ * 2) Not deal with partially received messages.
+ *
+ * The drawback to this approach is that we assume that messages
+ * are complete/well formed. If a message is shorter than its
+ * announced length, receive_message() will block on recvmsg()
+ * and never return (even if a signal is received).
+ */
+ ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
+ if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_unlock;
+ }
+ ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_clean_poll;
+ }
+ ret = lttng_poll_wait_interruptible(&events, -1);
+ if (ret <= 0) {
+ status = (ret == -1 && errno == EINTR) ?
+ LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
+ LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_clean_poll;
+ }
ret = receive_message(channel);
if (ret) {
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
- goto end_unlock;
+ goto end_clean_poll;
}
switch (get_current_message_type(channel)) {
channel);
if (!notification) {
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
- goto end_unlock;
+ goto end_clean_poll;
}
break;
case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
default:
/* Protocol error. */
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
- goto end_unlock;
+ goto end_clean_poll;
}
+end_clean_poll:
+ lttng_poll_clean(&events);
end_unlock:
pthread_mutex_unlock(&channel->lock);
+ *_notification = notification;
end:
- if (_notification) {
- *_notification = notification;
- }
return status;
}
goto end;
}
+enum lttng_notification_channel_status
+lttng_notification_channel_has_pending_notification(
+ struct lttng_notification_channel *channel,
+ bool *_notification_pending)
+{
+ int ret;
+ enum lttng_notification_channel_status status =
+ LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+ struct lttng_poll_event events;
+
+ if (!channel || !_notification_pending) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
+ goto end;
+ }
+
+ pthread_mutex_lock(&channel->lock);
+
+ if (channel->pending_notifications.count) {
+ *_notification_pending = true;
+ goto end_unlock;
+ }
+
+ if (channel->socket < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
+ goto end_unlock;
+ }
+
+ /*
+ * Check, without blocking, if data is available on the channel's
+ * socket. If there is data available, it is safe to read (blocking)
+ * on the socket for a message from the session daemon.
+ *
+ * Since all commands wait for the session daemon's reply before
+ * releasing the channel's lock, the protocol only allows for
+ * notifications and "notification dropped" messages to come
+ * through. If we receive a different message type, it is
+ * considered a protocol error.
+ *
+ * Note that this function is not guaranteed not to block. This
+ * will block until our peer (the session daemon) has sent a complete
+ * message if we see data available on the socket. If the peer does
+ * not respect the protocol, this may block indefinitely.
+ */
+ ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
+ if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_unlock;
+ }
+ ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_clean_poll;
+ }
+ /* timeout = 0: return immediately. */
+ ret = lttng_poll_wait_interruptible(&events, 0);
+ if (ret == 0) {
+ /* No data available. */
+ *_notification_pending = false;
+ goto end_clean_poll;
+ } else if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_clean_poll;
+ }
+
+ /* Data available on socket. */
+ ret = receive_message(channel);
+ if (ret) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_clean_poll;
+ }
+
+ switch (get_current_message_type(channel)) {
+ case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
+ ret = enqueue_notification_from_current_message(channel);
+ if (ret) {
+ goto end_clean_poll;
+ }
+ *_notification_pending = true;
+ break;
+ case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
+ ret = enqueue_dropped_notification(channel);
+ if (ret) {
+ goto end_clean_poll;
+ }
+ *_notification_pending = true;
+ break;
+ default:
+ /* Protocol error. */
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_clean_poll;
+ }
+
+end_clean_poll:
+ lttng_poll_clean(&events);
+end_unlock:
+ pthread_mutex_unlock(&channel->lock);
+end:
+ return status;
+}
+
static
int receive_command_reply(struct lttng_notification_channel *channel,
enum lttng_notification_channel_status *status)
pthread_mutex_lock(&channel->lock);
- ret = lttcomm_send_unix_sock(channel->socket, send_buffer,
+ ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
sizeof(send_buffer));
if (ret < 0) {
goto end_unlock;
const struct lttng_condition *condition)
{
int socket;
- ssize_t command_size, ret;
+ ssize_t ret;
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
- char *command_buffer = NULL;
- struct lttng_notification_channel_message cmd_message = {
- .type = type,
+ struct lttng_dynamic_buffer buffer;
+ struct lttng_notification_channel_message cmd_header = {
+ .type = (int8_t) type,
};
+ lttng_dynamic_buffer_init(&buffer);
+
if (!channel) {
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
goto end;
goto end_unlock;
}
- ret = lttng_condition_serialize(condition, NULL);
- if (ret < 0) {
- status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
- goto end_unlock;
- }
- assert(ret < UINT32_MAX);
- cmd_message.size = (uint32_t) ret;
- command_size = ret + sizeof(
- struct lttng_notification_channel_message);
- command_buffer = zmalloc(command_size);
- if (!command_buffer) {
+ ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
+ sizeof(cmd_header));
+ if (ret) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
goto end_unlock;
}
- memcpy(command_buffer, &cmd_message, sizeof(cmd_message));
- ret = lttng_condition_serialize(condition,
- command_buffer + sizeof(cmd_message));
- if (ret < 0) {
+ ret = lttng_condition_serialize(condition, &buffer);
+ if (ret) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
goto end_unlock;
}
- ret = lttcomm_send_unix_sock(socket, command_buffer, command_size);
+ /* Update payload length. */
+ ((struct lttng_notification_channel_message *) buffer.data)->size =
+ (uint32_t) (buffer.size - sizeof(cmd_header));
+
+ ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
if (ret < 0) {
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
goto end_unlock;
end_unlock:
pthread_mutex_unlock(&channel->lock);
end:
- free(command_buffer);
+ lttng_dynamic_buffer_reset(&buffer);
return status;
}