#include "lttng-sessiond.hpp"
#include "kernel.hpp"
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
/* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
#define MAX_CAPTURE_SIZE (PIPE_BUF)
LTTNG_OBJECT_TYPE_SESSION,
};
-struct lttng_trigger_list_element {
- /* No ownership of the trigger object is assumed. */
- struct lttng_trigger *trigger;
- struct cds_list_head node;
-};
-
struct lttng_channel_trigger_list {
struct channel_key channel_key;
/* List of struct lttng_trigger_list_element. */
struct rcu_head rcu_node;
};
+namespace {
+struct lttng_trigger_list_element {
+ /* No ownership of the trigger object is assumed. */
+ struct lttng_trigger *trigger;
+ struct cds_list_head node;
+};
+
struct lttng_trigger_ht_element {
struct lttng_trigger *trigger;
struct cds_lfht_node node;
/* call_rcu delayed reclaim. */
struct rcu_head rcu_node;
};
+} /* namespace */
static unsigned long hash_channel_key(struct channel_key *key);
static int evaluate_buffer_condition(const struct lttng_condition *condition,
{
/* This double-cast is intended to supress pointer-to-cast warning. */
const notification_client_id id = *((notification_client_id *) key);
- const struct notification_client *client = caa_container_of(
- node, struct notification_client, client_id_ht_node);
+ const struct notification_client *client = lttng::utils::container_of(
+ node, ¬ification_client::client_id_ht_node);
return client->id == id;
}
int match_session(struct cds_lfht_node *node, const void *key)
{
const char *name = (const char *) key;
- struct session_info *session_info = caa_container_of(
- node, struct session_info, sessions_ht_node);
+ struct session_info *session_info = lttng::utils::container_of(
+ node, &session_info::sessions_ht_node);
return !strcmp(session_info->name, name);
}
static
void free_channel_info_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct channel_info, rcu_node));
+ free(lttng::utils::container_of(node, &channel_info::rcu_node));
}
static
static
void free_session_info_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct session_info, rcu_node));
+ free(lttng::utils::container_of(node, &session_info::rcu_node));
}
/* Don't call directly, use the ref-counting mechanism. */
void notification_client_list_release(struct urcu_ref *list_ref)
{
struct notification_client_list *list =
- container_of(list_ref, typeof(*list), ref);
+ lttng::utils::container_of(list_ref, ¬ification_client_list::ref);
struct notification_client_list_element *client_list_element, *tmp;
lttng_condition_put(list->condition);
&iter);
node = cds_lfht_iter_get_node(&iter);
if (node) {
- list = container_of(node, struct notification_client_list,
- notification_trigger_clients_ht_node);
+ list = lttng::utils::container_of(node,
+ ¬ification_client_list::notification_trigger_clients_ht_node);
list = notification_client_list_get(list) ? list : NULL;
}
static
void free_notification_client_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct notification_client, rcu_node));
+ free(lttng::utils::container_of(node, ¬ification_client::rcu_node));
}
static
return 0;
}
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+ struct notification_thread_command **cmd)
+{
+ int ret;
+ uint64_t counter;
+
+ pthread_mutex_lock(&handle->cmd_queue.lock);
+ ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+ if (ret != sizeof(counter)) {
+ ret = -1;
+ goto error_unlock;
+ }
+
+ *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+ struct notification_thread_command, cmd_list_node);
+ cds_list_del(&((*cmd)->cmd_list_node));
+ ret = 0;
+
+error_unlock:
+ pthread_mutex_unlock(&handle->cmd_queue.lock);
+ return ret;
+}
+
/* Returns 0 on success, 1 on exit requested, negative value on error. */
int handle_notification_thread_command(
struct notification_thread_handle *handle,
struct notification_thread_state *state)
{
int ret;
- uint64_t counter;
struct notification_thread_command *cmd;
- /* Read the event pipe to put it back into a quiescent state. */
- ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
- sizeof(counter));
- if (ret != sizeof(counter)) {
+ ret = pop_cmd_queue(handle, &cmd);
+ if (ret) {
goto error;
}
- pthread_mutex_lock(&handle->cmd_queue.lock);
- cmd = cds_list_first_entry(&handle->cmd_queue.list,
- struct notification_thread_command, cmd_list_node);
- cds_list_del(&cmd->cmd_list_node);
- pthread_mutex_unlock(&handle->cmd_queue.lock);
-
DBG("Received `%s` command",
notification_command_type_str(cmd->type));
switch (cmd->type) {
goto error;
}
+ client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
ret = lttng_poll_add(&state->events, client->socket,
- LPOLLIN | LPOLLERR |
- LPOLLHUP | LPOLLRDHUP);
+ client->communication.current_poll_events);
if (ret < 0) {
ERR("Failed to add notification channel client socket to poll set");
ret = 0;
return error_occurred ? -1 : 0;
}
+static
+bool client_has_outbound_data_left(
+ const struct notification_client *client)
+{
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const bool has_data = pv.buffer.size != 0;
+ const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+ return has_data || has_fds;
+}
+
static
int client_handle_transmission_status(
struct notification_client *client,
switch (transmission_status) {
case CLIENT_TRANSMISSION_STATUS_COMPLETE:
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN);
- if (ret) {
- goto end;
- }
-
- break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
+ {
+ int current_poll_events;
+ int new_poll_events;
/*
* We want to be notified whenever there is buffer space
- * available to send the rest of the payload.
+ * available to send the rest of the payload if we are
+ * waiting to send data to the client.
+ *
+ * The state of the outbound queue being sampled here is
+ * fine since:
+ * - it is okay to wake-up "for nothing" in case we see
+ * that data is left, but another thread succeeds in
+ * flushing it before us when handling the client "out"
+ * event. We will simply stop monitoring that event the next
+ * time it wakes us up and we see no data left to be sent,
+ * - if another thread fails to flush the entire client
+ * outgoing queue, it will issue a "communication update"
+ * command and cause the client's (e)poll mask to be
+ * re-evaluated.
+ *
+ * The situation we seek to avoid would be to disable the
+ * monitoring of "out" client events indefinitely when there is
+ * data to be sent, which can't happen because of the
+ * aforementioned "communication update" mechanism.
*/
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN_OUT);
- if (ret) {
- goto end;
+ pthread_mutex_lock(&client->lock);
+ current_poll_events = client->communication.current_poll_events;
+ new_poll_events = client_has_outbound_data_left(client) ?
+ CLIENT_POLL_EVENTS_IN_OUT :
+ CLIENT_POLL_EVENTS_IN;
+ client->communication.current_poll_events = new_poll_events;
+ pthread_mutex_unlock(&client->lock);
+
+ /* Update the monitored event set only if it changed. */
+ if (current_poll_events != new_poll_events) {
+ ret = lttng_poll_mod(&state->events, client->socket,
+ new_poll_events);
+ if (ret) {
+ goto end;
+ }
}
+
break;
+ }
case CLIENT_TRANSMISSION_STATUS_FAIL:
ret = notification_thread_client_disconnect(client, state);
if (ret) {
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-static
-bool client_has_outbound_data_left(
- const struct notification_client *client)
-{
- const struct lttng_payload_view pv = lttng_payload_view_from_payload(
- &client->communication.outbound.payload, 0, -1);
- const bool has_data = pv.buffer.size != 0;
- const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
- return has_data || has_fds;
-}
-
/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
struct lttng_notification_channel_command_reply reply = {
.status = (int8_t) status,
};
- struct lttng_notification_channel_message msg = {
- .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
- .size = sizeof(reply),
- .fds = 0,
- };
+ struct lttng_notification_channel_message msg;
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
+ msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY;
+ msg.size = sizeof(reply);
+ msg.fds = 0;
+
memcpy(buffer, &msg, sizeof(msg));
memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
DBG("Send command reply (%i)", (int) status);
.major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
.minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
};
- const struct lttng_notification_channel_message msg_header = {
- .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
- .size = sizeof(handshake_reply),
- .fds = 0,
- };
+ struct lttng_notification_channel_message msg_header;
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
+ msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE;
+ msg_header.size = sizeof(handshake_reply);
+ msg_header.fds = 0;
+
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
sizeof(handshake_reply));
}
pthread_mutex_lock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
+ if (!client_has_outbound_data_left(client)) {
+ /*
+ * A client "out" event can be received when no payload is left
+ * to send under some circumstances.
+ *
+ * Many threads can flush a client's outgoing queue and, if they
+ * had to queue their message (socket was full), will use the
+ * "communication update" command to signal the (e)poll thread
+ * to monitor for space being made available in the socket.
+ *
+ * Commands are sent over an internal pipe serviced by the same
+ * thread as the client sockets.
+ *
+ * When space is made available in the socket, there is a race
+ * between the (e)poll thread and the other threads that may
+ * wish to use the client's socket to flush its outgoing queue.
+ *
+ * A non-(e)poll thread may attempt (and succeed) in flushing
+ * the queue before the (e)poll thread gets a chance to service
+ * the client's "out" event.
+ *
+ * In this situation, the (e)poll thread processing the client
+ * out event will see an empty payload: there is nothing to do
+ * except unsubscribing (e)poll "out" events.
+ *
+ * Note that this thread is the (e)poll thread so it can modify
+ * the (e)poll mask directly without using a communication
+ * update command. Other threads that flush the outgoing queue
+ * will use the "communication update" command to wake up this
+ * thread and force it to monitor "out" events.
+ *
+ * When other threads succeed in emptying the outgoing queue,
+ * they don't need to update the (e)poll mask: if the "out"
+ * event is monitored, it will fire once and the (e)poll
+ * thread will reach this condition, causing the event to
+ * stop being monitored.
+ */
+ transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ } else {
+ transmission_status = client_flush_outgoing_queue(client);
+ }
pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(
bool result = false;
uint64_t threshold;
enum lttng_condition_type condition_type;
- const struct lttng_condition_buffer_usage *use_condition = container_of(
- condition, struct lttng_condition_buffer_usage,
- parent);
+ const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of(
+ condition, <tng_condition_buffer_usage::parent);
if (use_condition->threshold_bytes.set) {
threshold = use_condition->threshold_bytes.value;
{
uint64_t threshold;
const struct lttng_condition_session_consumed_size *size_condition =
- container_of(condition,
- struct lttng_condition_session_consumed_size,
- parent);
+ lttng::utils::container_of(condition,
+ <tng_condition_session_consumed_size::parent);
threshold = size_condition->consumed_threshold_bytes.value;
DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
int client_notification_overflow(struct notification_client *client)
{
int ret = 0;
- const struct lttng_notification_channel_message msg = {
- .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
- .size = 0,
- .fds = 0,
- };
+ struct lttng_notification_channel_message msg;
+
+ msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED;
+ msg.size = 0;
+ msg.fds = 0;
ASSERT_LOCKED(client->lock);
.trigger = (struct lttng_trigger *) trigger,
.evaluation = (struct lttng_evaluation *) evaluation,
};
- struct lttng_notification_channel_message msg_header = {
- .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
- .size = 0,
- .fds = 0,
- };
+ struct lttng_notification_channel_message msg_header;
const struct lttng_credentials *trigger_creds =
lttng_trigger_get_credentials(trigger);
lttng_payload_init(&msg_payload);
+ msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
+ msg_header.size = 0;
+ msg_header.fds = 0;
+
ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
sizeof(msg_header));
if (ret) {
}
evaluation = lttng_evaluation_event_rule_matches_create(
- container_of(lttng_trigger_get_const_condition(
+ lttng::utils::container_of(lttng_trigger_get_const_condition(
element->trigger),
- struct lttng_condition_event_rule_matches,
- parent),
+ <tng_condition_event_rule_matches::parent),
notification->capture_buffer,
notification->capture_buf_size, false);