* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+#include "common/buffer-view.h"
+#include <stdint.h>
#define _LGPL_SOURCE
#include <assert.h>
#include <poll.h>
return ret;
}
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+ const char **addr)
+{
+ int ret;
+ unsigned long mmap_offset;
+ const char *mmap_base = stream->mmap_base;
+
+ ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+ if (ret < 0) {
+ PERROR("Failed to get mmap read offset");
+ goto error;
+ }
+
+ *addr = mmap_base + mmap_offset;
+error:
+ return ret;
+}
+
/*
* Take a snapshot of all the stream of a channel
*
* Assign the received relayd ID so we can use it for streaming. The streams
* are not visible to anyone so this is OK to change it.
*/
- stream->net_seq_idx = relayd_id;
+ stream->relayd_id = relayd_id;
channel->relayd_id = relayd_id;
if (relayd_id != (uint64_t) -1ULL) {
ret = consumer_send_relayd_stream(stream, path);
while (consumed_pos < produced_pos) {
ssize_t read_len;
unsigned long len, padded_len;
+ const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
health_code_update();
-
DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
goto error_put_subbuf;
}
- read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ goto error_put_subbuf;
+ }
+
+ subbuf_view = lttng_buffer_view_init(
+ subbuf_addr, 0, padded_len);
+ read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+ stream, &subbuf_view,
padded_len - len, NULL);
/*
* We write the padded len in local tracefiles but the data len
}
} else {
close_relayd_stream(stream);
- stream->net_seq_idx = (uint64_t) -1ULL;
+ stream->relayd_id = (uint64_t) -1ULL;
}
pthread_mutex_unlock(&stream->lock);
}
if (use_relayd) {
close_relayd_stream(metadata_stream);
- metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+ metadata_stream->relayd_id = (uint64_t) -1ULL;
} else {
if (metadata_stream->out_fd >= 0) {
ret = close(metadata_stream->out_fd);
health_code_update();
- new_stream = consumer_allocate_stream(channel->key,
+ pthread_mutex_lock(&channel->lock);
+ new_stream = consumer_allocate_stream(
+ channel,
+ channel->key,
fd,
LTTNG_CONSUMER_ACTIVE_STREAM,
channel->name,
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
break;
}
+ pthread_mutex_unlock(&channel->lock);
goto end_nosignal;
}
- new_stream->chan = channel;
new_stream->wait_fd = fd;
switch (channel->output) {
case CONSUMER_CHANNEL_SPLICE:
if (!channel->monitor) {
DBG("Kernel consumer add stream %s in no monitor mode with "
"relayd id %" PRIu64, new_stream->name,
- new_stream->net_seq_idx);
+ new_stream->relayd_id);
cds_list_add(&new_stream->send_node, &channel->streams.head);
+ pthread_mutex_unlock(&channel->lock);
break;
}
/* Send stream to relayd if the stream has an ID. */
- if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+ if (new_stream->relayd_id != (uint64_t) -1ULL) {
ret = consumer_send_relayd_stream(new_stream,
new_stream->chan->pathname);
if (ret < 0) {
+ pthread_mutex_unlock(&channel->lock);
consumer_stream_free(new_stream);
goto end_nosignal;
}
*/
if (channel->streams_sent_to_relayd) {
ret = consumer_send_relayd_streams_sent(
- new_stream->net_seq_idx);
+ new_stream->relayd_id);
if (ret < 0) {
+ pthread_mutex_unlock(&channel->lock);
goto end_nosignal;
}
}
}
+ pthread_mutex_unlock(&channel->lock);
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
}
break;
case CONSUMER_CHANNEL_MMAP:
+ {
+ const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
+
/* Get subbuffer size without padding */
err = kernctl_get_subbuf_size(infd, &subbuf_size);
if (err != 0) {
goto end;
}
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ goto error_put_subbuf;
+ }
+
/* Make sure the tracer is not gone mad on us! */
assert(len >= subbuf_size);
padding = len - subbuf_size;
+ subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
/* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
- padding, &index);
+ ret = lttng_consumer_on_read_subbuffer_mmap(
+ ctx, stream, &subbuf_view, padding, &index);
/*
- * The mmap operation should write subbuf_size amount of data when
- * network streaming or the full padding (len) size when we are _not_
- * streaming.
+ * The mmap operation should write subbuf_size amount of data
+ * when network streaming or the full padding (len) size when we
+ * are _not_ streaming.
*/
- if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
- (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
+ if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) ||
+ (ret != len && stream->relayd_id == (uint64_t) -1ULL)) {
/*
* Display the error but continue processing to try to release the
* subbuffer. This is a DBG statement since this is possible to
write_index = 0;
}
break;
+ }
default:
ERR("Unknown output method");
ret = -EPERM;
}
-
+error_put_subbuf:
err = kernctl_put_next_subbuf(infd);
if (err != 0) {
if (err == -EFAULT) {
* Don't create anything if this is set for streaming or should not be
* monitored.
*/
- if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
+ if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) {
ret = utils_create_stream_file(stream->chan->pathname, stream->name,
stream->chan->tracefile_size, stream->tracefile_count_current,
stream->uid, stream->gid, NULL);