static struct lttng_ht *metadata_ht;
static struct lttng_ht *data_ht;
-/*
- * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond session
- * id.
- *
- * Node can be added when a relayd communication is opened in the sessiond
- * thread.
- *
- * Note that a session id of the session daemon is unique to a tracing session
- * and not to a domain session. However, a domain session has one consumer
- * which forces the 1-1 mapping between a consumer and a domain session (ex:
- * UST). This means that we can't have duplicate in this ht.
- */
-static struct lttng_ht *relayd_session_id_ht;
-
/*
* Notify a thread pipe to poll back again. This usually means that some global
* state has changed so we just send back the thread in a poll wait call.
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
if (relayd == NULL) {
return;
DBG("Consumer destroy and close relayd socket pair");
- /* Loockup for a relayd node in the session id map hash table. */
- lttng_ht_lookup(relayd_session_id_ht,
- (void *)((unsigned long) relayd->sessiond_session_id), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node != NULL) {
- /* We assume the relayd is being or is destroyed */
- return;
- }
-
- /*
- * Try to delete it from the relayd session id ht. The return value is of
- * no importance since either way we are going to try to delete the relayd
- * from the global relayd_ht.
- */
- lttng_ht_del(relayd_session_id_ht, &iter);
-
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
}
lttng_ht_destroy(consumer_data.relayd_ht);
- /* The destroy_relayd call makes sure that this ht is empty here. */
- lttng_ht_destroy(relayd_session_id_ht);
rcu_read_unlock();
}
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ /*
+ * Note that net_seq_num below is assigned with the *current* value of
+ * next_net_seq_num and only after that the next_net_seq_num will be
+ * increment. This is why when issuing a command on the relayd using
+ * this next value, 1 should always be substracted in order to compare
+ * the last seen sequence number on the relayd side to the last sent.
+ */
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
do {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
+ if (ret < 0 || ret != 1) {
PERROR("write consumer quit");
}
do {
ret = write(fd, (void *) &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata stream id");
+ if (ret < 0 || ret != sizeof(hdr)) {
+ /*
+ * This error means that the fd's end is closed so ignore the perror
+ * not to clubber the error output since this can happen in a normal
+ * code path.
+ */
+ if (errno != EPIPE) {
+ PERROR("write metadata stream id");
+ }
+ DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
+ /*
+ * Set ret to a negative value because if ret != sizeof(hdr), we don't
+ * handle writting the missing part so report that as an error and
+ * don't lie to the caller.
+ */
+ ret = -1;
goto end;
}
DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
} while (ret < 0 && errno == EINTR);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
- PERROR("Error in file write");
+ /*
+ * This is possible if the fd is closed on the other side (outfd)
+ * or any write problem. It can be verbose a bit for a normal
+ * execution if for instance the relayd is stopped abruptly. This
+ * can happen so set this to a DBG statement.
+ */
+ DBG("Error in file write mmap");
if (written == 0) {
written = ret;
}
DBG("Metadata main loop started");
while (1) {
- lttng_poll_reset(&events);
-
- nb_fd = LTTNG_POLL_GETNB(&events);
-
/* Only the metadata pipe is set */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
goto end;
}
restart:
- DBG("Metadata poll wait with %d fd(s)", nb_fd);
+ DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
ret = lttng_poll_wait(&events, -1);
DBG("Metadata event catched in thread");
if (ret < 0) {
goto error;
}
+ nb_fd = ret;
+
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
revents = LTTNG_POLL_GETEV(&events, i);
if (stream == NULL) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
- continue;
+ goto restart;
}
DBG("Adding metadata stream %d to poll set",
pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+ if (pipe_readlen < 0) {
+ PERROR("read consumer data pipe");
+ /* Continue so we can at least handle the current stream(s). */
+ continue;
+ }
/*
* If the stream is NULL, just ignore it. It's also possible that
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
- if (sock <= 0) {
+ if (sock < 0) {
WARN("On accept");
goto end;
}
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
/*
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
unsigned int sessiond_id)
{
- int fd = -1, ret = -1;
+ int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
- struct consumer_relayd_session_id *relayd_id_node;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ ret = -1;
goto error;
}
relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
/* Poll on consumer socket. */
relayd->control_sock.fd = fd;
/*
- * Create a session on the relayd and store the returned id. No need to
- * grab the socket lock since the relayd object is not yet visible.
+ * Create a session on the relayd and store the returned id. Lock the
+ * control socket mutex if the relayd was NOT created before.
*/
+ if (!relayd_created) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ }
ret = relayd_create_session(&relayd->control_sock,
&relayd->relayd_session_id);
- if (ret < 0) {
- goto error;
+ if (!relayd_created) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
-
- /* Set up a relayd session id node. */
- relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
- if (!relayd_id_node) {
- PERROR("zmalloc relayd id node");
+ if (ret < 0) {
goto error;
}
- relayd_id_node->relayd_id = relayd->relayd_session_id;
- relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
-
- /* Indexed by session id of the sessiond. */
- lttng_ht_node_init_ulong(&relayd_id_node->node,
- relayd_id_node->sessiond_id);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
- rcu_read_unlock();
-
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
+ ret = -1;
goto error;
}
PERROR("close received socket");
}
}
+
+ if (relayd_created) {
+ /* We just want to cleanup. Ignore ret value. */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+ free(relayd);
+ }
+
return ret;
}
static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
struct consumer_relayd_sock_pair *relayd = NULL;
- struct consumer_relayd_session_id *session_id_map;
-
- /* Get the session id map. */
- lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node == NULL) {
- goto end;
- }
-
- session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
- node);
/* Iterate over all relayd since they are indexed by net_seq_idx. */
cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
node.node) {
- if (relayd->relayd_session_id == session_id_map->relayd_id) {
+ /*
+ * Check by sessiond id which is unique here where the relayd session
+ * id might not be when having multiple relayd.
+ */
+ if (relayd->sessiond_session_id == id) {
/* Found the relayd. There can be only one per id. */
- break;
+ goto found;
}
}
-end:
+ return NULL;
+
+found:
return relayd;
}
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
- ret = relayd_quiescent_control(&relayd->control_sock);
+ ret = relayd_quiescent_control(&relayd->control_sock,
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
- stream->relayd_stream_id, stream->next_net_seq_num);
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0 || !is_data_inflight) {
- /* On error or if NO data inflight, no data is pending. */
+ if (ret < 0) {
goto data_not_pending;
}
+ if (is_data_inflight) {
+ goto data_pending;
+ }
}
/*