ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
- lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
msg.u.channel.mmap_len,
msg.u.channel.max_sb_size);
if (new_channel == NULL) {
- lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
}
if (ctx->on_recv_channel != NULL) {
/* Get stream file descriptor from socket */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
rcu_read_unlock();
return ret;
}
msg.u.stream.net_index,
msg.u.stream.metadata_flag);
if (new_stream == NULL) {
- lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
}
+ /*
+ * The buffer flush is done on the session daemon side for the kernel
+ * so no need for the stream "hangup_flush_done" variable to be
+ * tracked. This is important for a kernel stream since we don't rely
+ * on the flush state of the stream to read data. It's not the case for
+ * user space tracing.
+ */
+ new_stream->hangup_flush_done = 0;
+
/* The stream is not metadata. Get relayd reference if exists. */
relayd = consumer_find_relayd(msg.u.stream.net_index);
if (relayd != NULL) {
goto end_nosignal;
}
- if (ctx->on_recv_stream != NULL) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret == 0) {
- consumer_add_stream(new_stream);
- } else if (ret < 0) {
- goto end_nosignal;
+ /* Send stream to the metadata thread */
+ if (new_stream->metadata_flag) {
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
+
+ do {
+ ret = write(ctx->consumer_metadata_pipe[1], new_stream,
+ sizeof(struct lttng_consumer_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write metadata pipe");
}
} else {
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
consumer_add_stream(new_stream);
}
} while (ret < 0 && errno == EINTR);
end_nosignal:
rcu_read_unlock();
- return 0;
+
+ /*
+ * Return 1 to indicate success since the 0 value can be a socket
+ * shutdown during the recv() or send() call.
+ */
+ return 1;
}
/*
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
+ ret = -err;
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
/* read the whole subbuffer */
err = kernctl_get_padded_subbuf_size(infd, &len);
if (err != 0) {
- errno = -ret;
+ errno = -err;
perror("Getting sub-buffer len failed.");
+ ret = -err;
goto end;
}
/* read the used subbuffer size */
err = kernctl_get_padded_subbuf_size(infd, &len);
if (err != 0) {
- errno = -ret;
+ errno = -err;
perror("Getting sub-buffer len failed.");
+ ret = -err;
goto end;
}
/* write the subbuffer to the tracefile */
err = kernctl_put_next_subbuf(infd);
if (err != 0) {
- errno = -ret;
+ errno = -err;
if (errno == EFAULT) {
perror("Error in unreserving sub buffer\n");
} else if (errno == EIO) {
/* Should never happen with newer LTTng versions */
perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
+
+ ret = -err;
goto end;
}