data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ /*
+ * Note that net_seq_num below is assigned with the *current* value of
+ * next_net_seq_num and only after that the next_net_seq_num will be
+ * increment. This is why when issuing a command on the relayd using
+ * this next value, 1 should always be substracted in order to compare
+ * the last seen sequence number on the relayd side to the last sent.
+ */
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
do {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
+ if (ret < 0 || ret != 1) {
PERROR("write consumer quit");
}
do {
ret = write(fd, (void *) &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata stream id");
+ if (ret < 0 || ret != sizeof(hdr)) {
+ /*
+ * This error means that the fd's end is closed so ignore the perror
+ * not to clubber the error output since this can happen in a normal
+ * code path.
+ */
+ if (errno != EPIPE) {
+ PERROR("write metadata stream id");
+ }
+ DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
+ /*
+ * Set ret to a negative value because if ret != sizeof(hdr), we don't
+ * handle writting the missing part so report that as an error and
+ * don't lie to the caller.
+ */
+ ret = -1;
goto end;
}
DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
} while (ret < 0 && errno == EINTR);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
- PERROR("Error in file write");
+ /*
+ * This is possible if the fd is closed on the other side (outfd)
+ * or any write problem. It can be verbose a bit for a normal
+ * execution if for instance the relayd is stopped abruptly. This
+ * can happen so set this to a DBG statement.
+ */
+ DBG("Error in file write mmap");
if (written == 0) {
written = ret;
}
DBG("Metadata main loop started");
while (1) {
- lttng_poll_reset(&events);
-
- nb_fd = LTTNG_POLL_GETNB(&events);
-
/* Only the metadata pipe is set */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
goto end;
}
restart:
- DBG("Metadata poll wait with %d fd(s)", nb_fd);
+ DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
ret = lttng_poll_wait(&events, -1);
DBG("Metadata event catched in thread");
if (ret < 0) {
goto error;
}
+ nb_fd = ret;
+
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
revents = LTTNG_POLL_GETEV(&events, i);
if (stream == NULL) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
- continue;
+ goto restart;
}
DBG("Adding metadata stream %d to poll set",
pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+ if (pipe_readlen < 0) {
+ PERROR("read consumer data pipe");
+ /* Continue so we can at least handle the current stream(s). */
+ continue;
+ }
/*
* If the stream is NULL, just ignore it. It's also possible that
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
- if (sock <= 0) {
+ if (sock < 0) {
WARN("On accept");
goto end;
}
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id)
{
- int fd = -1, ret = -1;
+ int fd = -1, ret = -1, relayd_created = 0;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ ret = -1;
goto error;
}
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
/* Poll on consumer socket. */
goto error;
}
+ /* We have the fds without error. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Copy socket information and received FD */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
+
+ /*
+ * Create a session on the relayd and store the returned id. Lock the
+ * control socket mutex if the relayd was NOT created before.
+ */
+ if (!relayd_created) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ }
+ ret = relayd_create_session(&relayd->control_sock,
+ &relayd->relayd_session_id);
+ if (!relayd_created) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+ if (ret < 0) {
+ goto error;
+ }
+
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
+ ret = -1;
goto error;
}
PERROR("close received socket");
}
}
+
+ if (relayd_created) {
+ /* We just want to cleanup. Ignore ret value. */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+ free(relayd);
+ }
+
return ret;
}
return ret;
}
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* Iterate over all relayd since they are indexed by net_seq_idx. */
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ /*
+ * Check by sessiond id which is unique here where the relayd session
+ * id might not be when having multiple relayd.
+ */
+ if (relayd->sessiond_session_id == id) {
+ /* Found the relayd. There can be only one per id. */
+ goto found;
+ }
+ }
+
+ return NULL;
+
+found:
+ return relayd;
+}
+
/*
* Check if for a given session id there is still data needed to be extract
* from the buffers.
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_pending;
+ goto data_pending;
}
/*
ret = data_pending(stream);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
/* Relayd check */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- /*
- * At this point, if the relayd object is not available for the
- * given stream, it is because the relayd is being cleaned up
- * so every stream associated with it (for a session id value)
- * are or will be marked for deletion hence no data pending.
- */
- pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
- }
-
+ if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
- ret = relayd_quiescent_control(&relayd->control_sock);
+ ret = relayd_quiescent_control(&relayd->control_sock,
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
- stream->relayd_stream_id, stream->next_net_seq_num);
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
pthread_mutex_unlock(&stream->lock);
}
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_end_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto data_not_pending;
+ }
+ if (is_data_inflight) {
+ goto data_pending;
+ }
+ }
+
/*
- * Finding _no_ node in the hash table means that the stream(s) have been
- * removed thus data is guaranteed to be available for analysis from the
- * trace files. This is *only* true for local consumer and not network
- * streaming.
+ * Finding _no_ node in the hash table and no inflight data means that the
+ * stream(s) have been removed thus data is guaranteed to be available for
+ * analysis from the trace files.
*/
+data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 0;
-data_not_pending:
+data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 1;
}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}