* Return destination file descriptor or negative value on error.
*/
static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
- size_t data_size, struct consumer_relayd_sock_pair *relayd)
+ size_t data_size, unsigned long padding,
+ struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
struct lttcomm_relayd_data_hdr data_hdr;
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
+ data_hdr.padding_size = htobe32(padding);
data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
/* Other fields are zeroed previously */
*/
static int write_relayd_metadata_id(int fd,
struct lttng_consumer_stream *stream,
- struct consumer_relayd_sock_pair *relayd)
+ struct consumer_relayd_sock_pair *relayd,
+ unsigned long padding)
{
int ret;
- uint64_t metadata_id;
+ struct lttcomm_relayd_metadata_payload hdr;
- metadata_id = htobe64(stream->relayd_stream_id);
+ hdr.stream_id = htobe64(stream->relayd_stream_id);
+ hdr.padding_size = htobe32(padding);
do {
- ret = write(fd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
+ ret = write(fd, (void *) &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
PERROR("write metadata stream id");
goto end;
}
- DBG("Metadata stream id %" PRIu64 " written before data",
- stream->relayd_stream_id);
+ DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
+ stream->relayd_stream_id, padding);
end:
return ret;
*/
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding)
{
unsigned long mmap_offset;
ssize_t ret = 0, written = 0;
if (stream->metadata_flag) {
/* Metadata requires the control socket. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- netlen += sizeof(stream->relayd_stream_id);
+ netlen += sizeof(struct lttcomm_relayd_metadata_payload);
}
- ret = write_relayd_stream_header(stream, netlen, relayd);
+ ret = write_relayd_stream_header(stream, netlen, padding, relayd);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
/* Write metadata stream id before payload */
if (stream->metadata_flag) {
- ret = write_relayd_metadata_id(outfd, stream, relayd);
+ ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
if (ret < 0) {
written = ret;
goto end;
}
}
/* Else, use the default set before which is the filesystem. */
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
}
while (len > 0) {
do {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
} while (ret < 0 && errno == EINTR);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
PERROR("Error in file write");
if (written == 0) {
len -= ret;
mmap_offset += ret;
}
- DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
*/
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
}
/* Write metadata stream id before payload */
- if (stream->metadata_flag && relayd) {
- /*
- * Lock the control socket for the complete duration of the function
- * since from this point on we will use the socket.
- */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (relayd) {
+ int total_len = len;
- ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd);
- if (ret < 0) {
- written = ret;
+ if (stream->metadata_flag) {
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+ ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
+ padding);
+ if (ret < 0) {
+ written = ret;
+ goto end;
+ }
+
+ total_len += sizeof(struct lttcomm_relayd_metadata_payload);
+ }
+
+ ret = write_relayd_stream_header(stream, total_len, padding, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+ } else {
+ ERR("Remote relayd disconnected. Stopping");
goto end;
}
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
}
while (len > 0) {
- DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
- (unsigned long)offset, len, fd);
+ DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
+ (unsigned long)offset, len, fd, splice_pipe[1]);
ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
if (stream->metadata_flag) {
+ size_t metadata_payload_size =
+ sizeof(struct lttcomm_relayd_metadata_payload);
+
/* Update counter to fit the spliced data */
- ret_splice += sizeof(stream->relayd_stream_id);
- len += sizeof(stream->relayd_stream_id);
+ ret_splice += metadata_payload_size;
+ len += metadata_payload_size;
/*
* We do this so the return value can match the len passed as
* argument to this function.
*/
- written -= sizeof(stream->relayd_stream_id);
- }
-
- ret = write_relayd_stream_header(stream, ret_splice, relayd);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
- } else {
- ERR("Remote relayd disconnected. Stopping");
- goto end;
+ written -= metadata_payload_size;
}
}
/* Splice data out */
ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
+ DBG("Consumer splice pipe to file, ret %zd", ret_splice);
if (ret_splice < 0) {
PERROR("Error in file splice");
if (written == 0) {