*/
#define _GNU_SOURCE
+#include <assert.h>
#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
-#include <urcu/list.h>
+#include <lttng-kernel-ctl.h>
+#include <lttng-sessiond-comm.h>
#include <lttng/lttng-kconsumerd.h>
-
-#include "kernelctl.h"
-#include "lttngerr.h"
-#include "lttng-sessiond-comm.h"
+#include <lttngerr.h>
static struct lttng_kconsumerd_global_data {
/*
}
/*
- * Add a fd to the global list protected by a mutex.
+ * Create a struct lttcomm_kconsumerd_msg from the
+ * information received on the receiving socket
*/
-static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
+struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
+ struct lttcomm_kconsumerd_msg *buf,
int consumerd_fd)
{
struct lttng_kconsumerd_fd *tmp_fd;
- int ret = 0;
- pthread_mutex_lock(&kconsumerd_data.lock);
- /* Check if already exist */
- ret = kconsumerd_find_session_fd(buf->fd);
- if (ret == 1) {
+ tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
+ if (tmp_fd == NULL) {
+ perror("malloc struct lttng_kconsumerd_fd");
goto end;
}
- tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
tmp_fd->sessiond_fd = buf->fd;
tmp_fd->consumerd_fd = consumerd_fd;
tmp_fd->state = buf->state;
tmp_fd->output = buf->output;
strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
tmp_fd->path_name[PATH_MAX - 1] = '\0';
+ DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
+ tmp_fd->path_name, tmp_fd->sessiond_fd,
+ tmp_fd->consumerd_fd, tmp_fd->out_fd);
- /* Opening the tracefile in write mode */
- if (tmp_fd->path_name != NULL) {
- ret = open(tmp_fd->path_name,
- O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
- if (ret < 0) {
- ERR("Opening %s", tmp_fd->path_name);
- perror("open");
- goto end;
- }
- tmp_fd->out_fd = ret;
- DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
- tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
- }
+end:
+ return tmp_fd;
+}
- if (tmp_fd->output == LTTNG_EVENT_MMAP) {
- /* get the len of the mmap region */
- ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len);
- if (ret != 0) {
- ret = errno;
- perror("kernctl_get_mmap_len");
- goto end;
- }
+/*
+ * Add a fd to the global list protected by a mutex.
+ */
+static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
+{
+ int ret;
- tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
- PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0);
- if (tmp_fd->mmap_base == MAP_FAILED) {
- perror("Error mmaping");
- ret = -1;
- goto end;
- }
+ pthread_mutex_lock(&kconsumerd_data.lock);
+ /* Check if already exist */
+ ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
+ if (ret == 1) {
+ goto end;
}
-
cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
kconsumerd_data.fds_count++;
kconsumerd_data.need_update = 1;
+
end:
pthread_mutex_unlock(&kconsumerd_data.lock);
return ret;
enum lttng_kconsumerd_command cmd_type)
{
struct iovec iov[1];
- int ret = 0, i, tmp2;
+ int ret = 0, i, j, tmp2;
struct cmsghdr *cmsg;
int nb_fd;
char recv_fd[CMSG_SPACE(sizeof(int))];
struct lttcomm_kconsumerd_msg lkm;
+ struct lttng_kconsumerd_fd *new_fd;
+ union {
+ unsigned char vc[4];
+ int vi;
+ } tmp;
/* the number of fds we are about to receive */
nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
switch (cmd_type) {
case ADD_STREAM:
- DBG("kconsumerd_add_fd %s (%d)", lkm.path_name,
- ((int *) CMSG_DATA(cmsg))[0]);
-
- ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
- if (ret < 0) {
+ for (j = 0; j < sizeof(int); j++)
+ tmp.vc[j] = CMSG_DATA(cmsg)[j];
+ DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, tmp.vi);
+ new_fd = kconsumerd_allocate_fd(&lkm, tmp.vi);
+ if (new_fd == NULL) {
lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
goto end;
}
+
+ if (ctx->on_recv_fd != NULL) {
+ ret = ctx->on_recv_fd(new_fd);
+ if (ret == 0) {
+ kconsumerd_add_fd(new_fd);
+ } else if (ret < 0) {
+ goto end;
+ }
+ } else {
+ kconsumerd_add_fd(new_fd);
+ }
break;
case UPDATE_STREAM:
- kconsumerd_change_fd_state(lkm.fd, lkm.state);
+ if (ctx->on_update_fd != NULL) {
+ ret = ctx->on_update_fd(lkm.fd, lkm.state);
+ if (ret == 0) {
+ kconsumerd_change_fd_state(lkm.fd, lkm.state);
+ } else if (ret < 0) {
+ goto end;
+ }
+ } else {
+ kconsumerd_change_fd_state(lkm.fd, lkm.state);
+ }
break;
default:
break;
ctx->kconsumerd_command_sock_path = sock;
}
+static void lttng_kconsumerd_sync_trace_file(
+ struct lttng_kconsumerd_fd *kconsumerd_fd, off_t orig_offset)
+{
+ int outfd = kconsumerd_fd->out_fd;
+ /*
+ * This does a blocking write-and-wait on any page that belongs to the
+ * subbuffer prior to the one we just wrote.
+ * Don't care about error values, as these are just hints and ways to
+ * limit the amount of page cache used.
+ */
+ if (orig_offset >= kconsumerd_fd->max_sb_size) {
+ sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
+ kconsumerd_fd->max_sb_size,
+ SYNC_FILE_RANGE_WAIT_BEFORE
+ | SYNC_FILE_RANGE_WRITE
+ | SYNC_FILE_RANGE_WAIT_AFTER);
+ /*
+ * Give hints to the kernel about how we access the file:
+ * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
+ * we write it.
+ *
+ * We need to call fadvise again after the file grows because the
+ * kernel does not seem to apply fadvise to non-existing parts of the
+ * file.
+ *
+ * Call fadvise _after_ having waited for the page writeback to
+ * complete because the dirty page writeback semantic is not well
+ * defined. So it can be expected to lead to lower throughput in
+ * streaming.
+ */
+ posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
+ kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
+ }
+}
+
+
/*
* Mmap the ring buffer, read it and write the data to the tracefile.
*
struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len)
{
unsigned long mmap_offset;
- char *padding = NULL;
long ret = 0;
off_t orig_offset = kconsumerd_fd->out_fd_offset;
int fd = kconsumerd_fd->consumerd_fd;
kconsumerd_fd->out_fd_offset += ret;
}
- /*
- * This does a blocking write-and-wait on any page that belongs to the
- * subbuffer prior to the one we just wrote.
- * Don't care about error values, as these are just hints and ways to
- * limit the amount of page cache used.
- */
- if (orig_offset >= kconsumerd_fd->max_sb_size) {
- sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
- kconsumerd_fd->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE
- | SYNC_FILE_RANGE_WRITE
- | SYNC_FILE_RANGE_WAIT_AFTER);
+ lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
- kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
- }
goto end;
end:
- if (padding != NULL) {
- free(padding);
- }
return ret;
}
SYNC_FILE_RANGE_WRITE);
kconsumerd_fd->out_fd_offset += ret;
}
+ lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset);
- /*
- * This does a blocking write-and-wait on any page that belongs to the
- * subbuffer prior to the one we just wrote.
- * Don't care about error values, as these are just hints and ways to
- * limit the amount of page cache used.
- */
- if (orig_offset >= kconsumerd_fd->max_sb_size) {
- sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
- kconsumerd_fd->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE
- | SYNC_FILE_RANGE_WRITE
- | SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
- kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
- }
goto end;
splice_error:
return ret;
}
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx,
+ struct lttng_kconsumerd_fd *kconsumerd_fd)
+{
+ int ret = 0;
+ int infd = kconsumerd_fd->consumerd_fd;
+
+ ret = kernctl_snapshot(infd);
+ if (ret != 0) {
+ ret = errno;
+ perror("Getting sub-buffer snapshot.");
+ }
+
+ return ret;
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumerd_get_produced_snapshot(
+ struct lttng_kconsumerd_local_data *ctx,
+ struct lttng_kconsumerd_fd *kconsumerd_fd,
+ unsigned long *pos)
+{
+ int ret;
+ int infd = kconsumerd_fd->consumerd_fd;
+
+ ret = kernctl_snapshot_get_produced(infd, pos);
+ if (ret != 0) {
+ ret = errno;
+ perror("kernctl_snapshot_get_produced");
+ }
+
+ return ret;
+}
+
/*
* Poll on the should_quit pipe and the command socket return -1 on error and
* should exit, 0 if data is available on the command socket
* Returns a pointer to the new context or NULL on error.
*/
struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
- int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
+ int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+ int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+ int (*update_fd)(int sessiond_fd, uint32_t state))
{
- int ret;
+ int ret, i;
struct lttng_kconsumerd_local_data *ctx;
ctx = malloc(sizeof(struct lttng_kconsumerd_local_data));
if (ctx == NULL) {
perror("allocating context");
- goto end;
+ goto error;
}
+ ctx->kconsumerd_error_socket = -1;
+ /* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
+ ctx->on_recv_fd = recv_fd;
+ ctx->on_update_fd = update_fd;
ret = pipe(ctx->kconsumerd_poll_pipe);
if (ret < 0) {
perror("Error creating poll pipe");
- ctx = NULL;
- goto end;
+ goto error_poll_pipe;
}
ret = pipe(ctx->kconsumerd_should_quit);
if (ret < 0) {
perror("Error creating recv pipe");
- ctx = NULL;
- goto end;
+ goto error_quit_pipe;
}
ret = pipe(ctx->kconsumerd_thread_pipe);
if (ret < 0) {
perror("Error creating thread pipe");
- ctx = NULL;
- goto end;
+ goto error_thread_pipe;
}
-end:
return ctx;
+
+
+error_thread_pipe:
+ for (i = 0; i < 2; i++) {
+ int err;
+
+ err = close(ctx->kconsumerd_should_quit[i]);
+ assert(!err);
+ }
+error_quit_pipe:
+ for (i = 0; i < 2; i++) {
+ int err;
+
+ err = close(ctx->kconsumerd_poll_pipe[i]);
+ assert(!err);
+ }
+error_poll_pipe:
+ free(ctx);
+error:
+ return NULL;
}
/*
DBG("Sending ready command to ltt-sessiond");
ret = lttng_kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY);
+ /* return < 0 on error, but == 0 is not fatal */
if (ret < 0) {
ERR("Error sending ready command to ltt-sessiond");
goto end;
/*
* Send return code to the session daemon.
+ * If the socket is not defined, we return 0, it is not a fatal error
*/
int lttng_kconsumerd_send_error(
struct lttng_kconsumerd_local_data *ctx, int cmd)