#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/kernel-ctl/kernel-ctl.h>
call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
}
+/*
+ * Flag a relayd socket pair for destruction. Destroy it if the refcount
+ * reaches zero.
+ *
+ * RCU read side lock MUST be aquired before calling this function.
+ */
+void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
+{
+ assert(relayd);
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
uatomic_dec(&relayd->refcount);
assert(uatomic_read(&relayd->refcount) >= 0);
+ /* Closing streams requires to lock the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Unable to close stream on the relayd. Continuing");
- /* Continue here. There is nothing we can do for the relayd.*/
+ DBG("Unable to close stream on the relayd. Continuing");
+ /*
+ * Continue here. There is nothing we can do for the relayd.
+ * Chances are that the relayd has closed the socket so we just
+ * continue cleaning up.
+ */
}
/* Both conditions are met, we destroy the relayd. */
}
/*
- * Add relayd socket to global consumer data hashtable.
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
*/
+
int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
goto end;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.relayd_ht,
(void *)((unsigned long) relayd->net_seq_idx), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
- rcu_read_unlock();
/* Relayd already exist. Ignore the insertion */
goto end;
}
lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
- rcu_read_unlock();
-
end:
return ret;
}
*
* Return destination file descriptor or negative value on error.
*/
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
- size_t data_size)
+static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
+ size_t data_size, struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
- struct consumer_relayd_sock_pair *relayd;
struct lttcomm_relayd_data_hdr data_hdr;
/* Safety net */
assert(stream);
+ assert(relayd);
/* Reset data header */
memset(&data_hdr, 0, sizeof(data_hdr));
- rcu_read_lock();
- /* Get relayd reference of the stream. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
- /* Stream is either local or corrupted */
- goto error;
- }
-
- DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
if (stream->metadata_flag) {
/* Caller MUST acquire the relayd control socket lock */
ret = relayd_send_metadata(&relayd->control_sock, data_size);
}
error:
- rcu_read_unlock();
return outfd;
}
}
/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
+ * Write the metadata stream id on the specified file descriptor.
+ */
+static int write_relayd_metadata_id(int fd,
+ struct lttng_consumer_stream *stream,
+ struct consumer_relayd_sock_pair *relayd)
+{
+ int ret;
+ uint64_t metadata_id;
+
+ metadata_id = htobe64(stream->relayd_stream_id);
+ do {
+ ret = write(fd, (void *) &metadata_id,
+ sizeof(stream->relayd_stream_id));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write metadata stream id");
+ goto end;
+ }
+ DBG("Metadata stream id %" PRIu64 " written before data",
+ stream->relayd_stream_id);
+
+end:
+ return ret;
+}
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile. This is a
+ * core function for writing trace buffers to either the local filesystem or
+ * the network.
+ *
+ * Careful review MUST be put if any changes occur!
*
* Returns the number of bytes written
*/
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
+ unsigned long mmap_offset;
+ ssize_t ret = 0, written = 0;
+ off_t orig_offset = stream->out_fd_offset;
+ /* Default is on the disk */
+ int outfd = stream->out_fd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
+ /* Flag that the current stream if set for network streaming. */
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd == NULL) {
+ goto end;
+ }
+ }
+
+ /* get the offset inside the fd to mmap */
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+ ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+ break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+ ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
+ stream->buf, &mmap_offset);
+ break;
default:
ERR("Unknown consumer_data type");
assert(0);
}
+ if (ret != 0) {
+ errno = -ret;
+ PERROR("tracer ctl get_mmap_read_offset");
+ written = ret;
+ goto end;
+ }
- return 0;
+ /* Handle stream on the relayd if the output is on the network */
+ if (relayd) {
+ unsigned long netlen = len;
+
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ if (stream->metadata_flag) {
+ /* Metadata requires the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ netlen += sizeof(stream->relayd_stream_id);
+ }
+
+ ret = write_relayd_stream_header(stream, netlen, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+
+ /* Write metadata stream id before payload */
+ if (stream->metadata_flag) {
+ ret = write_relayd_metadata_id(outfd, stream, relayd);
+ if (ret < 0) {
+ written = ret;
+ goto end;
+ }
+ }
+ }
+ /* Else, use the default set before which is the filesystem. */
+ }
+
+ while (len > 0) {
+ do {
+ ret = write(outfd, stream->mmap_base + mmap_offset, len);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Error in file write");
+ if (written == 0) {
+ written = ret;
+ }
+ goto end;
+ } else if (ret > len) {
+ PERROR("Error in file write (ret %zd > len %lu)", ret, len);
+ written += ret;
+ goto end;
+ } else {
+ len -= ret;
+ mmap_offset += ret;
+ }
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += ret;
+ }
+ written += ret;
+ }
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+
+end:
+ /* Unlock only if ctrl socket used */
+ if (relayd && stream->metadata_flag) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+
+ rcu_read_unlock();
+ return written;
}
/*
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
+ ssize_t ret = 0, written = 0, ret_splice = 0;
+ loff_t offset = 0;
+ off_t orig_offset = stream->out_fd_offset;
+ int fd = stream->wait_fd;
+ /* Default is on the disk */
+ int outfd = stream->out_fd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
+ break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /* Not supported for user space tracing */
return -ENOSYS;
default:
ERR("Unknown consumer_data type");
assert(0);
- return -ENOSYS;
}
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
+ /* Flag that the current stream if set for network streaming. */
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd == NULL) {
+ goto end;
+ }
+ }
+
+ /* Write metadata stream id before payload */
+ if (stream->metadata_flag && relayd) {
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+ ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1],
+ stream, relayd);
+ if (ret < 0) {
+ written = ret;
+ goto end;
+ }
+ }
+
+ while (len > 0) {
+ DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
+ (unsigned long)offset, len, fd);
+ ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
+ SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("splice chan to pipe, ret %zd", ret_splice);
+ if (ret_splice < 0) {
+ PERROR("Error in relay splice");
+ if (written == 0) {
+ written = ret_splice;
+ }
+ ret = errno;
+ goto splice_error;
+ }
+
+ /* Handle stream on the relayd if the output is on the network */
+ if (relayd) {
+ if (stream->metadata_flag) {
+ /* Update counter to fit the spliced data */
+ ret_splice += sizeof(stream->relayd_stream_id);
+ len += sizeof(stream->relayd_stream_id);
+ /*
+ * We do this so the return value can match the len passed as
+ * argument to this function.
+ */
+ written -= sizeof(stream->relayd_stream_id);
+ }
+
+ ret = write_relayd_stream_header(stream, ret_splice, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+ } else {
+ ERR("Remote relayd disconnected. Stopping");
+ goto end;
+ }
+ }
+
+ /* Splice data out */
+ ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
+ ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
+ if (ret_splice < 0) {
+ PERROR("Error in file splice");
+ if (written == 0) {
+ written = ret_splice;
+ }
+ ret = errno;
+ goto splice_error;
+ } else if (ret_splice > len) {
+ errno = EINVAL;
+ PERROR("Wrote more data than requested %zd (len: %lu)",
+ ret_splice, len);
+ written += ret_splice;
+ ret = errno;
+ goto splice_error;
+ }
+ len -= ret_splice;
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += ret_splice;
+ }
+ written += ret_splice;
+ }
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+
+ ret = ret_splice;
+
+ goto end;
+
+splice_error:
+ /* send the appropriate error description to sessiond */
+ switch (ret) {
+ case EBADF:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
+ break;
+ case EINVAL:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
+ break;
+ case ENOMEM:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
+ break;
+ case ESPIPE:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
+ break;
+ }
+
+end:
+ if (relayd && stream->metadata_flag) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+
+ rcu_read_unlock();
+ return written;
}
/*
metadata_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
- lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
goto restart;
}
perror("Poll error");
- lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
goto end;
} else if (num_rdy == 0) {
DBG("Polling thread timed out");
}
DBG("Sending ready command to lttng-sessiond");
- ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
+ ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
/* return < 0 on error, but == 0 is not fatal */
if (ret < 0) {
ERR("Error sending ready command to lttng-sessiond");
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
+
+/*
+ * Process the ADD_RELAYD command receive by a consumer.
+ *
+ * This will create a relayd socket pair and add it to the relayd hash table.
+ * The caller MUST acquire a RCU read side lock before calling it.
+ */
+int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+ struct lttng_consumer_local_data *ctx, int sock,
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+{
+ int fd, ret = -1;
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd == NULL) {
+ /* Not found. Allocate one. */
+ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+ if (relayd == NULL) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ goto error;
+ }
+ }
+
+ /* Poll on consumer socket. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = -EINTR;
+ goto error;
+ }
+
+ /* Get relayd socket from session daemon */
+ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret != sizeof(fd)) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret = -1;
+ goto error;
+ }
+
+ /* Copy socket information and received FD */
+ switch (sock_type) {
+ case LTTNG_STREAM_CONTROL:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
+ ret = lttcomm_create_sock(&relayd->control_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Close the created socket fd which is useless */
+ close(relayd->control_sock.fd);
+
+ /* Assign new file descriptor */
+ relayd->control_sock.fd = fd;
+ break;
+ case LTTNG_STREAM_DATA:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
+ ret = lttcomm_create_sock(&relayd->data_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Close the created socket fd which is useless */
+ close(relayd->data_sock.fd);
+
+ /* Assign new file descriptor */
+ relayd->data_sock.fd = fd;
+ break;
+ default:
+ ERR("Unknown relayd socket type (%d)", sock_type);
+ goto error;
+ }
+
+ DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+ sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+ relayd->net_seq_idx, fd);
+
+ /*
+ * Add relayd socket pair to consumer data hashtable. If object already
+ * exists or on error, the function gracefully returns.
+ */
+ consumer_add_relayd(relayd);
+
+ /* All good! */
+ ret = 0;
+
+error:
+ return ret;
+}