UST 2.0 support
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sun, 23 Oct 2011 01:06:11 +0000 (21:06 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sun, 23 Oct 2011 01:06:11 +0000 (21:06 -0400)
Refactoring of the consumer/sessiond interaction so the consumer
supports applications instrumented with libust (UST 2.0).

At this point, more testing of interaction between libust and sessiond
is required.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
43 files changed:
Makefile.am
README
configure.ac
include/Makefile.am
include/ltt-kconsumerd.h [deleted file]
include/lttng-consumerd.h [new file with mode: 0644]
include/lttng-kernel-ctl.h
include/lttng-sessiond-comm.h
include/lttng-ust.h [deleted file]
include/lttng/lttng-consumer.h [new file with mode: 0644]
include/lttng/lttng-kconsumer.h [new file with mode: 0644]
include/lttng/lttng-kconsumerd.h [deleted file]
include/lttng/lttng-ustconsumer.h [new file with mode: 0644]
include/lttng/lttng.h
liblttng-consumer/Makefile.am [new file with mode: 0644]
liblttng-consumer/lttng-consumer.c [new file with mode: 0644]
liblttng-kconsumer/Makefile.am [new file with mode: 0644]
liblttng-kconsumer/lttng-kconsumer.c [new file with mode: 0644]
liblttng-sessiond-comm/lttng-sessiond-comm.c
liblttng-ustconsumer/Makefile.am [new file with mode: 0644]
liblttng-ustconsumer/lttng-ustconsumer.c [new file with mode: 0644]
liblttngkconsumerd/Makefile.am [deleted file]
liblttngkconsumerd/lttngkconsumerd.c [deleted file]
libustctl/Makefile.am [deleted file]
ltt-kconsumerd/Makefile.am [deleted file]
ltt-kconsumerd/ltt-kconsumerd.c [deleted file]
ltt-sessiond/Makefile.am
ltt-sessiond/compat/compat-epoll.c
ltt-sessiond/kernel-ctl.c
ltt-sessiond/lttng-ust-abi.h [new file with mode: 0644]
ltt-sessiond/main.c
ltt-sessiond/trace-kernel.h
ltt-sessiond/trace-ust.c
ltt-sessiond/trace-ust.h
ltt-sessiond/ust-app.h
ltt-sessiond/ust-comm.c
ltt-sessiond/ust-ctl.c
ltt-sessiond/ust-ctl.h
lttng-consumerd/Makefile.am [new file with mode: 0644]
lttng-consumerd/lttng-consumerd.c [new file with mode: 0644]
lttng/commands/version.c
lttng/lttng.c
tests/test_kernel_data_trace.c

index ec8b7fe95ac0a24e471a81a7024d4305a84ef161..9bafeb4818d23906dd9daf7d2779e22446edb94c 100644 (file)
@@ -2,12 +2,13 @@ ACLOCAL_AMFLAGS = -I config
 
 SUBDIRS = liblttng-sessiond-comm \
                  libkernelctl \
-                 liblttngkconsumerd \
+                 liblttng-kconsumer \
+                 liblttng-ustconsumer \
+                 liblttng-consumer \
+                 lttng-consumerd \
                  liblttngctl \
                  libustcomm \
-                 libustctl \
                  lttng \
-                 ltt-kconsumerd \
                  ltt-sessiond \
                  tests \
                  include \
diff --git a/README b/README
index 16e1154bdf33ac15f7ea560fe459bd8c03fc8583..dc7281562858840f0b52f217f256a0c238ec2eee 100644 (file)
--- a/README
+++ b/README
@@ -51,16 +51,16 @@ PACKAGE CONTENTS:
 
     - liblttsessiondcomm
       The ltt-sessiond communication library. In order to talk with ltt-sessiond,
-      thiis library must be used.
+      this library must be used.
 
     - libkernelctl
       Kernel tracer control and ioctl definitions.
 
-    - liblttngkconsumerd
-      Library for Kernel trace consumer.
+    - liblttng-consumer
+      Library for Kernel and (optionally) UST trace consumer.
 
-    - ltt-kconsumerd
-      The Kernel consumer daemon which uses liblttngkconsumerd.
+    - lttng-consumerd
+      The consumer daemon which uses liblttng-consumer.
 
     - ltt-sessiond
       The LTTng session daemon binary.
index c7dd21ec34bd685d0548c7a1e320c6cdb446fa4a..052c5a33d902cd121f4b5cb88f4dcb118eefcfb1 100644 (file)
@@ -6,6 +6,10 @@ AC_CONFIG_MACRO_DIR([config])
 AM_INIT_AUTOMAKE([foreign dist-bzip2 no-dist-gzip])
 AM_SILENT_RULES([yes])
 
+AC_CONFIG_HEADERS([include/config.h])
+
+AH_TEMPLATE([CONFIG_LTTNG_TOOLS_HAVE_UST], [Defined on systems where UST headers can be found.])
+
 AC_CHECK_HEADERS([ \
        sys/types.h unistd.h fcntl.h string.h pthread.h limits.h \
        signal.h stdlib.h sys/un.h sys/socket.h stdlib.h stdio.h \
@@ -41,6 +45,19 @@ AC_CHECK_DECL([rcu_thread_offline], [],
 AC_CHECK_DECL([rcu_thread_online], [],
        [AC_MSG_ERROR([liburcu $liburcu_version or newer is needed])], [[#include <urcu.h>]]
 )
+AC_CHECK_DECL([ustctl_create_session],
+       [
+               AC_DEFINE([CONFIG_LTTNG_TOOLS_HAVE_UST], 1)
+               have_ust_test=1
+       ],
+       [
+               AC_MSG_WARN([UST header not found. Building without UST support.])
+               have_ust_test=0
+       ],
+       [[#include <ust/lttng-ust-ctl.h>]]
+)
+
+AM_CONDITIONAL([LTTNG_TOOLS_HAVE_UST], [ test "x$have_ust_test" = "x1" ])
 
 # Epoll check. If not present, the build will fallback on poll() API
 AX_HAVE_EPOLL(
@@ -70,12 +87,13 @@ AC_CONFIG_FILES([
        Makefile
        include/Makefile
        libkernelctl/Makefile
-       liblttngkconsumerd/Makefile
+       liblttng-consumer/Makefile
+       liblttng-kconsumer/Makefile
+       liblttng-ustconsumer/Makefile
        liblttngctl/Makefile
        liblttng-sessiond-comm/Makefile
-       libustctl/Makefile
        libustcomm/Makefile
-       ltt-kconsumerd/Makefile
+       lttng-consumerd/Makefile
        ltt-sessiond/Makefile
        lttng/Makefile
        tests/Makefile
index 171eefc9dd38e50e47d35dbcb755e82f19f11e81..d53444f4e11d8de351d0ebb6ef72e8e8279af115 100644 (file)
@@ -1,4 +1,5 @@
-lttnginclude_HEADERS = lttng/lttng.h lttng/lttng-kconsumerd.h
+lttnginclude_HEADERS = lttng/lttng.h lttng/lttng-kconsumer.h \
+                       lttng/lttng-ustconsumer.h lttng/lttng-consumer.h
 
-noinst_HEADERS = lttngerr.h lttng-kernel.h ltt-kconsumerd.h lttng-share.h \
-                                lttng-sessiond-comm.h lttng-ust.h lttng-kernel-ctl.h
+noinst_HEADERS = lttngerr.h lttng-kernel.h lttng-consumerd.h lttng-share.h \
+                       lttng-sessiond-comm.h lttng-kernel-ctl.h
diff --git a/include/ltt-kconsumerd.h b/include/ltt-kconsumerd.h
deleted file mode 100644 (file)
index 95c5005..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      David Goulet <david.goulet@polymtl.ca>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; only verion 2
- * of the License.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
- */
-
-#ifndef _LTT_KCONSUMERD_H
-#define _LTT_KCONSUMERD_H
-
-/* Kernel consumer path */
-#define KCONSUMERD_PATH                     LTTNG_RUNDIR "/kconsumerd"
-#define KCONSUMERD_CMD_SOCK_PATH            KCONSUMERD_PATH "/command"
-#define KCONSUMERD_ERR_SOCK_PATH            KCONSUMERD_PATH "/error"
-
-#endif /* _LTT_KCONSUMERD_H */
diff --git a/include/lttng-consumerd.h b/include/lttng-consumerd.h
new file mode 100644 (file)
index 0000000..8c24030
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ *                      David Goulet <david.goulet@polymtl.ca>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only verion 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#ifndef _LTTNG_CONSUMERD_H
+#define _LTTNG_CONSUMERD_H
+
+/* Kernel consumer path */
+#define KCONSUMERD_PATH                     LTTNG_RUNDIR "/kconsumerd"
+#define KCONSUMERD_CMD_SOCK_PATH            KCONSUMERD_PATH "/command"
+#define KCONSUMERD_ERR_SOCK_PATH            KCONSUMERD_PATH "/error"
+
+/* UST consumer path */
+#define USTCONSUMERD_PATH                   LTTNG_RUNDIR "/ustconsumerd"
+#define USTCONSUMERD_CMD_SOCK_PATH          USTCONSUMERD_PATH "/command"
+#define USTCONSUMERD_ERR_SOCK_PATH          USTCONSUMERD_PATH "/error"
+
+#endif /* _LTTNG_CONSUMERD_H */
index b51273fac380a3d6613d4f50f12fdcfa5d4c5cf1..fa307e446cd277d7897e81f8ea779390c0d7b577 100644 (file)
@@ -17,8 +17,8 @@
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
  */
 
-#ifndef _LTT_LIBKERNELCTL_H
-#define _LTT_LIBKERNELCTL_H
+#ifndef _LTTNG_KERNEL_CTL_H
+#define _LTTNG_KERNEL_CTL_H
 
 #include <lttng/lttng.h>
 #include <lttng-kernel.h>
@@ -67,4 +67,4 @@ int kernctl_put_subbuf(int fd);
 
 int kernctl_buffer_flush(int fd);
 
-#endif /* _LTT_LIBKERNELCTL_H */
+#endif /* _LTTNG_KERNEL_CTL_H */
index fcdc5a2cf72331234e11dc161e1179e1bb164630..d9919add7d491878f76af02ec7578a517ef79efd 100644 (file)
@@ -28,7 +28,6 @@
 
 #include <limits.h>
 #include <lttng/lttng.h>
-#include <lttng-ust.h>
 
 #define LTTNG_RUNDIR                        "/var/run/lttng"
 
@@ -117,19 +116,19 @@ enum lttcomm_return_code {
        LTTCOMM_UST_SESS_FAIL,                  /* UST create session failed */
        LTTCOMM_UST_CHAN_NOT_FOUND,     /* UST channel not found */
        LTTCOMM_UST_CHAN_FAIL,          /* UST create channel failed */
-       KCONSUMERD_COMMAND_SOCK_READY,  /* when kconsumerd command socket ready */
-       KCONSUMERD_SUCCESS_RECV_FD,             /* success on receiving fds */
-       KCONSUMERD_ERROR_RECV_FD,               /* error on receiving fds */
-       KCONSUMERD_POLL_ERROR,                  /* Error in polling thread in kconsumerd */
-       KCONSUMERD_POLL_NVAL,                   /* Poll on closed fd */
-       KCONSUMERD_POLL_HUP,                    /* All fds have hungup */
-       KCONSUMERD_EXIT_SUCCESS,                /* kconsumerd exiting normally */
-       KCONSUMERD_EXIT_FAILURE,                /* kconsumerd exiting on error */
-       KCONSUMERD_OUTFD_ERROR,                 /* error opening the tracefile */
-       KCONSUMERD_SPLICE_EBADF,                /* EBADF from splice(2) */
-       KCONSUMERD_SPLICE_EINVAL,               /* EINVAL from splice(2) */
-       KCONSUMERD_SPLICE_ENOMEM,               /* ENOMEM from splice(2) */
-       KCONSUMERD_SPLICE_ESPIPE,               /* ESPIPE from splice(2) */
+       CONSUMERD_COMMAND_SOCK_READY,           /* when consumerd command socket ready */
+       CONSUMERD_SUCCESS_RECV_FD,              /* success on receiving fds */
+       CONSUMERD_ERROR_RECV_FD,                /* error on receiving fds */
+       CONSUMERD_POLL_ERROR,                   /* Error in polling thread in kconsumerd */
+       CONSUMERD_POLL_NVAL,                    /* Poll on closed fd */
+       CONSUMERD_POLL_HUP,                     /* All fds have hungup */
+       CONSUMERD_EXIT_SUCCESS,                 /* kconsumerd exiting normally */
+       CONSUMERD_EXIT_FAILURE,                 /* kconsumerd exiting on error */
+       CONSUMERD_OUTFD_ERROR,                  /* error opening the tracefile */
+       CONSUMERD_SPLICE_EBADF,                 /* EBADF from splice(2) */
+       CONSUMERD_SPLICE_EINVAL,                /* EINVAL from splice(2) */
+       CONSUMERD_SPLICE_ENOMEM,                /* ENOMEM from splice(2) */
+       CONSUMERD_SPLICE_ESPIPE,                /* ESPIPE from splice(2) */
        /* MUST be last element */
        LTTCOMM_NR,                                             /* Last element */
 };
@@ -186,26 +185,34 @@ struct lttcomm_lttng_msg {
 };
 
 /*
- * Data structures for the kconsumerd communications
- *
- * The header structure is sent to the kconsumerd daemon to inform
- * how many lttcomm_kconsumerd_msg it is about to receive
+ * lttcomm_consumer_msg is the message sent from sessiond to consumerd
+ * to either add a channel, add a stream, update a stream, or stop
+ * operation.
  */
-struct lttcomm_kconsumerd_header {
-       uint32_t payload_size;
-       uint32_t cmd_type;      /* enum kconsumerd_command */
+struct lttcomm_consumer_msg {
+       uint32_t cmd_type;      /* enum consumerd_command */
+       union {
+               struct {
+                       int channel_key;
+                       uint64_t max_sb_size; /* the subbuffer size for this channel */
+                       /* shm_fd and wait_fd are sent as ancillary data */
+                       uint64_t mmap_len;
+               } channel;
+               struct {
+                       int channel_key;
+                       int stream_key;
+                       /* shm_fd and wait_fd are sent as ancillary data */
+                       uint32_t state;    /* enum lttcomm_consumer_fd_state */
+                       enum lttng_event_output output; /* use splice or mmap to consume this fd */
+                       uint64_t mmap_len;
+                       char path_name[PATH_MAX];
+               } stream;
+       } u;
 };
 
-/* lttcomm_kconsumerd_msg represents a file descriptor to consume the
- * data and a path name to write it
- */
-struct lttcomm_kconsumerd_msg {
-       char path_name[PATH_MAX];
-       int fd;
-       uint32_t state;    /* enum lttcomm_kconsumerd_fd_state */
-       unsigned long max_sb_size; /* the subbuffer size for this channel */
-       enum lttng_event_output output; /* use splice or mmap to consume this fd */
-};
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+
+#include <ust/lttng-ust-abi.h>
 
 /*
  * Data structure for the commands sent from sessiond to UST.
@@ -240,17 +247,20 @@ struct lttcomm_ust_reply {
        } u;
 };
 
+#endif /* CONFIG_LTTNG_TOOLS_HAVE_UST */
+
 extern int lttcomm_create_unix_sock(const char *pathname);
 extern int lttcomm_connect_unix_sock(const char *pathname);
 extern int lttcomm_accept_unix_sock(int sock);
 extern int lttcomm_listen_unix_sock(int sock);
 extern int lttcomm_close_unix_sock(int sock);
-/* Send fd(s) over a unix socket. */
-extern ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds,
-               size_t nb_fd, size_t len);
-/* Recv fd(s) over a unix socket */
-extern ssize_t lttcomm_recv_fds_unix_sock(int sock, void *buf, int *fds,
-               size_t nb_fd, size_t len);
+
+#define LTTCOMM_MAX_SEND_FDS   4
+/* Send a message accompanied by fd(s) over a unix socket. */
+extern ssize_t lttcomm_send_fds_unix_sock(int sock, int *fds, size_t nb_fd);
+/* Recv a message accompanied by fd(s) from a unix socket */
+extern ssize_t lttcomm_recv_fds_unix_sock(int sock, int *fds, size_t nb_fd);
+
 extern ssize_t lttcomm_recv_unix_sock(int sock, void *buf, size_t len);
 extern ssize_t lttcomm_send_unix_sock(int sock, void *buf, size_t len);
 extern const char *lttcomm_get_readable_code(enum lttcomm_return_code code);
diff --git a/include/lttng-ust.h b/include/lttng-ust.h
deleted file mode 100644 (file)
index b7143f8..0000000
+++ /dev/null
@@ -1,109 +0,0 @@
-#ifndef _LTTNG_UST_H
-#define _LTTNG_UST_H
-
-/*
- * Taken from the lttng-ust-abi.h in the UST 2.0 git tree
- *
- * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * Copyright 2011 - David Goulet <david.goulet@polymtl.ca>
- *
- * LTTng-UST ABI header
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include <stdint.h>
-
-#define LTTNG_UST_SYM_NAME_LEN         128
-
-#define LTTNG_UST_COMM_VERSION_MAJOR   0
-#define LTTNG_UST_COMM_VERSION_MINOR   1
-
-enum lttng_ust_instrumentation {
-       LTTNG_UST_TRACEPOINT    = 0,
-       LTTNG_UST_PROBE         = 1,
-       LTTNG_UST_FUNCTION      = 2,
-};
-
-enum lttng_ust_output {
-       LTTNG_UST_MMAP          = 0,
-};
-
-struct lttng_ust_tracer_version {
-       uint32_t version;
-       uint32_t patchlevel;
-       uint32_t sublevel;
-};
-
-struct lttng_ust_channel {
-       int overwrite;                          /* 1: overwrite, 0: discard */
-       uint64_t subbuf_size;                   /* in bytes */
-       uint64_t num_subbuf;
-       unsigned int switch_timer_interval;     /* usecs */
-       unsigned int read_timer_interval;       /* usecs */
-       enum lttng_ust_output output;           /* output mode */
-       /* The following fields are used internally within UST. */
-       int shm_fd;
-       int wait_fd;
-       uint64_t memory_map_size;
-};
-
-struct lttng_ust_event {
-       char name[LTTNG_UST_SYM_NAME_LEN];      /* event name */
-       enum lttng_ust_instrumentation instrumentation;
-       /* Per instrumentation type configuration */
-       union {
-       } u;
-};
-
-enum lttng_ust_context_type {
-       LTTNG_UST_CONTEXT_VTID      = 0,
-};
-
-struct lttng_ust_context {
-       enum lttng_ust_context_type ctx;
-       union {
-       } u;
-};
-
-#define _UST_CMD(minor)                 (minor)
-#define _UST_CMDR(minor, type)          (minor)
-#define _UST_CMDW(minor, type)          (minor)
-
-/* Handled by object descriptor */
-#define LTTNG_UST_RELEASE               _UST_CMD(0x1)
-
-/* Handled by object cmd */
-
-/* LTTng-UST commands */
-#define LTTNG_UST_SESSION               _UST_CMD(0x40)
-#define LTTNG_UST_TRACER_VERSION        \
-       _UST_CMDR(0x41, struct lttng_ust_tracer_version)
-#define LTTNG_UST_TRACEPOINT_LIST       _UST_CMD(0x42)
-#define LTTNG_UST_WAIT_QUIESCENT        _UST_CMD(0x43)
-#define LTTNG_UST_REGISTER_DONE         _UST_CMD(0x44)
-
-/* Session FD ioctl */
-#define LTTNG_UST_METADATA             \
-       _UST_CMDW(0x50, struct lttng_ust_channel)
-#define LTTNG_UST_CHANNEL              \
-       _UST_CMDW(0x51, struct lttng_ust_channel)
-#define LTTNG_UST_SESSION_START        _UST_CMD(0x52)
-#define LTTNG_UST_SESSION_STOP         _UST_CMD(0x53)
-
-/* Channel FD ioctl */
-#define LTTNG_UST_STREAM               _UST_CMD(0x60)
-#define LTTNG_UST_EVENT                \
-       _UST_CMDW(0x61, struct lttng_ust_event)
-
-/* Event and Channel FD ioctl */
-#define LTTNG_UST_CONTEXT              \
-       _UST_CMDW(0x70, struct lttng_ust_context)
-
-/* Event, Channel and Session ioctl */
-#define LTTNG_UST_ENABLE               _UST_CMD(0x80)
-#define LTTNG_UST_DISABLE              _UST_CMD(0x81)
-
-#define LTTNG_UST_ROOT_HANDLE          0
-
-#endif /* _LTTNG_UST_H */
diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h
new file mode 100644 (file)
index 0000000..7ca94cc
--- /dev/null
@@ -0,0 +1,297 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _LTTNG_CONSUMER_H
+#define _LTTNG_CONSUMER_H
+
+#include <limits.h>
+#include <poll.h>
+#include <urcu/list.h>
+#include <lttng/lttng.h>
+
+/*
+ * When the receiving thread dies, we need to have a way to make the polling
+ * thread exit eventually. If all FDs hang up (normal case when the
+ * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for
+ * whatever reason some FDs remain open, the consumer should still exit
+ * eventually.
+ *
+ * If the timeout is reached, it means that during this period no events
+ * occurred on the FDs so we need to force an exit. This case should not happen
+ * but it is a safety to ensure we won't block the consumer indefinitely.
+ *
+ * The value of 2 seconds is an arbitrary choice.
+ */
+#define LTTNG_CONSUMER_POLL_TIMEOUT 2000
+
+/* Commands for consumer */
+enum lttng_consumer_command {
+       LTTNG_CONSUMER_ADD_CHANNEL,
+       LTTNG_CONSUMER_ADD_STREAM,
+       /* pause, delete, active depending on fd state */
+       LTTNG_CONSUMER_UPDATE_STREAM,
+       /* inform the consumer to quit when all fd has hang up */
+       LTTNG_CONSUMER_STOP,
+};
+
+/* State of each fd in consumer */
+enum lttng_consumer_stream_state {
+       LTTNG_CONSUMER_ACTIVE_STREAM,
+       LTTNG_CONSUMER_PAUSE_STREAM,
+       LTTNG_CONSUMER_DELETE_STREAM,
+};
+
+struct lttng_consumer_channel_list {
+       struct cds_list_head head;
+};
+
+struct lttng_consumer_stream_list {
+       struct cds_list_head head;
+};
+
+enum lttng_consumer_type {
+       LTTNG_CONSUMER_UNKNOWN = 0,
+       LTTNG_CONSUMER_KERNEL,
+       LTTNG_CONSUMER_UST,
+};
+
+struct lttng_consumer_channel {
+       struct cds_list_head list;
+       int key;
+       uint64_t max_sb_size; /* the subbuffer size for this channel */
+       int refcount; /* Number of streams referencing this channel */
+       /* For UST */
+       int shm_fd;
+       int wait_fd;
+       void *mmap_base;
+       size_t mmap_len;
+       struct shm_handle *handle;
+       int nr_streams;
+};
+
+/* Forward declaration for UST. */
+struct lib_ring_buffer;
+
+/*
+ * Internal representation of the streams, sessiond_key is used to identify
+ * uniquely a stream.
+ */
+struct lttng_consumer_stream {
+       struct cds_list_head list;
+       struct lttng_consumer_channel *chan;    /* associated channel */
+       /*
+        * key is the key used by the session daemon to refer to the
+        * object in the consumer daemon.
+        */
+       int key;
+       int shm_fd;
+       int wait_fd;
+       int out_fd; /* output file to write the data */
+       off_t out_fd_offset; /* write position in the output file descriptor */
+       char path_name[PATH_MAX]; /* tracefile name */
+       enum lttng_consumer_stream_state state;
+       size_t shm_len;
+       void *mmap_base;
+       size_t mmap_len;
+       enum lttng_event_output output; /* splice or mmap */
+       /* For UST */
+       struct lib_ring_buffer *buf;
+       int cpu;
+};
+
+/*
+ * UST consumer local data to the program. One or more instance per
+ * process.
+ */
+struct lttng_consumer_local_data {
+       /* function to call when data is available on a buffer */
+       int (*on_buffer_ready)(struct lttng_consumer_stream *stream);
+       /*
+        * function to call when we receive a new channel, it receives a
+        * newly allocated channel, depending on the return code of this
+        * function, the new channel will be handled by the application
+        * or the library.
+        *
+        * Returns:
+        *    > 0 (success, FD is kept by application)
+        *   == 0 (success, FD is left to library)
+        *    < 0 (error)
+        */
+       int (*on_recv_channel)(struct lttng_consumer_channel *channel);
+       /*
+        * function to call when we receive a new stream, it receives a
+        * newly allocated stream, depending on the return code of this
+        * function, the new stream will be handled by the application
+        * or the library.
+        *
+        * Returns:
+        *    > 0 (success, FD is kept by application)
+        *   == 0 (success, FD is left to library)
+        *    < 0 (error)
+        */
+       int (*on_recv_stream)(struct lttng_consumer_stream *stream);
+       /*
+        * function to call when a stream is getting updated by the session
+        * daemon, this function receives the sessiond key and the new
+        * state, depending on the return code of this function the
+        * update of state for the stream is handled by the application
+        * or the library.
+        *
+        * Returns:
+        *    > 0 (success, FD is kept by application)
+        *   == 0 (success, FD is left to library)
+        *    < 0 (error)
+        */
+       int (*on_update_stream)(int sessiond_key, uint32_t state);
+       /* socket to communicate errors with sessiond */
+       int consumer_error_socket;
+       /* socket to exchange commands with sessiond */
+       char *consumer_command_sock_path;
+       /* communication with splice */
+       int consumer_thread_pipe[2];
+       /* pipe to wake the poll thread when necessary */
+       int consumer_poll_pipe[2];
+       /* to let the signal handler wake up the fd receiver thread */
+       int consumer_should_quit[2];
+};
+
+/*
+ * Library-level data. One instance per process.
+ */
+struct lttng_consumer_global_data {
+       /*
+        * consumer_data.lock protects consumer_data.fd_list,
+        * consumer_data.stream_count, and consumer_data.need_update. It
+        * ensures the count matches the number of items in the fd_list.
+        * It ensures the list updates *always* trigger an fd_array
+        * update (therefore need to make list update vs
+        * consumer_data.need_update flag update atomic, and also flag
+        * read, fd array and flag clear atomic).
+        */
+       pthread_mutex_t lock;
+       /*
+        * Number of streams in the list below. Protected by
+        * consumer_data.lock.
+        */
+       int stream_count;
+       /*
+        * Lists of streams and channels. Protected by consumer_data.lock.
+        */
+       struct lttng_consumer_stream_list stream_list;
+       struct lttng_consumer_channel_list channel_list;
+       /*
+        * Flag specifying if the local array of FDs needs update in the
+        * poll function. Protected by consumer_data.lock.
+        */
+       unsigned int need_update;
+       enum lttng_consumer_type type;
+};
+
+/*
+ * Set the error socket for communication with a session daemon.
+ */
+extern void lttng_consumer_set_error_sock(
+               struct lttng_consumer_local_data *ctx, int sock);
+
+/*
+ * Set the command socket path for communication with a session daemon.
+ */
+extern void lttng_consumer_set_command_sock_path(
+               struct lttng_consumer_local_data *ctx, char *sock);
+
+/*
+ * Send return code to session daemon.
+ *
+ * Returns the return code of sendmsg : the number of bytes transmitted or -1
+ * on error.
+ */
+extern int lttng_consumer_send_error(
+               struct lttng_consumer_local_data *ctx, int cmd);
+
+/*
+ * Called from signal handler to ensure a clean exit.
+ */
+extern void lttng_consumer_should_exit(
+               struct lttng_consumer_local_data *ctx);
+
+/*
+ * Cleanup the daemon's socket on exit.
+ */
+extern void lttng_consumer_cleanup(void);
+
+/*
+ * Flush pending writes to trace output disk file.
+ */
+extern void lttng_consumer_sync_trace_file(
+               struct lttng_consumer_stream *stream, off_t orig_offset);
+
+/*
+ * Poll on the should_quit pipe and the command socket return -1 on error and
+ * should exit, 0 if data is available on the command socket
+ */
+extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
+
+extern int consumer_update_poll_array(
+               struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
+               struct lttng_consumer_stream **local_consumer_streams);
+
+extern struct lttng_consumer_stream *consumer_allocate_stream(
+               int channel_key, int stream_key,
+               int shm_fd, int wait_fd,
+               enum lttng_consumer_stream_state state,
+               uint64_t mmap_len,
+               enum lttng_event_output output,
+               const char *path_name);
+extern int consumer_add_stream(struct lttng_consumer_stream *stream);
+extern void consumer_del_stream(struct lttng_consumer_stream *stream);
+extern void consumer_change_stream_state(int stream_key,
+               enum lttng_consumer_stream_state state);
+extern void consumer_del_channel(struct lttng_consumer_channel *channel);
+extern struct lttng_consumer_channel *consumer_allocate_channel(
+               int channel_key,
+               int shm_fd, int wait_fd,
+               uint64_t mmap_len,
+               uint64_t max_sb_size);
+int consumer_add_channel(struct lttng_consumer_channel *channel);
+
+extern struct lttng_consumer_local_data *lttng_consumer_create(
+               enum lttng_consumer_type type,
+               int (*buffer_ready)(struct lttng_consumer_stream *stream),
+               int (*recv_channel)(struct lttng_consumer_channel *channel),
+               int (*recv_stream)(struct lttng_consumer_stream *stream),
+               int (*update_stream)(int sessiond_key, uint32_t state));
+extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
+extern int lttng_consumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len);
+extern int lttng_consumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len);
+extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream);
+extern int lttng_consumer_get_produced_snapshot(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream,
+               unsigned long *pos);
+extern void *lttng_consumer_thread_poll_fds(void *data);
+extern void *lttng_consumer_thread_receive_fds(void *data);
+extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll);
+
+#endif /* _LTTNG_CONSUMER_H */
diff --git a/include/lttng/lttng-kconsumer.h b/include/lttng/lttng-kconsumer.h
new file mode 100644 (file)
index 0000000..764a3ef
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _LTTNG_KCONSUMER_H
+#define _LTTNG_KCONSUMER_H
+
+#include <lttng/lttng-consumer.h>
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile.
+ *
+ * Returns the number of bytes written.
+ */
+extern int lttng_kconsumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len);
+
+/*
+ * Splice the data from the ring buffer to the tracefile.
+ *
+ * Returns the number of bytes spliced.
+ */
+extern int lttng_kconsumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len);
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+        struct lttng_consumer_stream *stream);
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_produced_snapshot(
+        struct lttng_consumer_local_data *ctx,
+        struct lttng_consumer_stream *stream,
+        unsigned long *pos);
+
+int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll);
+
+#endif /* _LTTNG_KCONSUMER_H */
diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h
deleted file mode 100644 (file)
index 98771de..0000000
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; only version 2
- * of the License.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
- */
-
-#ifndef _LTTNG_KCONSUMERD_H
-#define _LTTNG_KCONSUMERD_H
-
-#include <limits.h>
-#include <lttng/lttng.h>
-#include <poll.h>
-#include <urcu/list.h>
-
-/*
- * When the receiving thread dies, we need to have a way to make the polling
- * thread exit eventually. If all FDs hang up (normal case when the
- * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for
- * whatever reason some FDs remain open, the consumer should still exit
- * eventually.
- *
- * If the timeout is reached, it means that during this period no events
- * occurred on the FDs so we need to force an exit. This case should not happen
- * but it is a safety to ensure we won't block the consumer indefinitely.
- *
- * The value of 2 seconds is an arbitrary choice.
- */
-#define LTTNG_KCONSUMERD_POLL_GRACE_PERIOD 2000
-
-/* Commands for kconsumerd */
-enum lttng_kconsumerd_command {
-       ADD_STREAM,
-       UPDATE_STREAM, /* pause, delete, active depending on fd state */
-       STOP, /* inform the kconsumerd to quit when all fd has hang up */
-};
-
-/* State of each fd in consumerd */
-enum lttng_kconsumerd_fd_state {
-       ACTIVE_FD,
-       PAUSE_FD,
-       DELETE_FD,
-};
-
-struct lttng_kconsumerd_fd_list {
-       struct cds_list_head head;
-};
-
-/*
- * Internal representation of the FDs, sessiond_fd is used to identify uniquely
- * a fd
- */
-struct lttng_kconsumerd_fd {
-       struct cds_list_head list;
-       int sessiond_fd; /* used to identify uniquely a fd with sessiond */
-       int consumerd_fd; /* fd to consume */
-       int out_fd; /* output file to write the data */
-       off_t out_fd_offset; /* write position in the output file descriptor */
-       char path_name[PATH_MAX]; /* tracefile name */
-       enum lttng_kconsumerd_fd_state state;
-       unsigned long max_sb_size; /* the subbuffer size for this channel */
-       void *mmap_base;
-       size_t mmap_len;
-       enum lttng_event_output output; /* splice or mmap */
-};
-
-/*
- * Kernel consumer local data to the program.
- */
-struct lttng_kconsumerd_local_data {
-       /* function to call when data is available on a buffer */
-       int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd);
-       /*
-        * function to call when we receive a new fd, it receives a
-        * newly allocated kconsumerd_fd, depending on the return code
-        * of this function, the new FD will be handled by the
-        * application or the library.
-        *
-        * Returns:
-        *    > 0 (success, FD is kept by application)
-        *   == 0 (success, FD is left to library)
-        *    < 0 (error)
-        */
-       int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd);
-       /*
-        * function to call when a FD is getting updated by the session
-        * daemon, this function receives the FD as seen by the session
-        * daemon (sessiond_fd) and the new state, depending on the
-        * return code of this function the update of state for the FD
-        * is handled by the application or the library.
-        *
-        * Returns:
-        *    > 0 (success, FD is kept by application)
-        *   == 0 (success, FD is left to library)
-        *    < 0 (error)
-        */
-       int (*on_update_fd)(int sessiond_fd, uint32_t state);
-       /* socket to communicate errors with sessiond */
-       int kconsumerd_error_socket;
-       /* socket to exchange commands with sessiond */
-       char *kconsumerd_command_sock_path;
-       /* communication with splice */
-       int kconsumerd_thread_pipe[2];
-       /* pipe to wake the poll thread when necessary */
-       int kconsumerd_poll_pipe[2];
-       /* to let the signal handler wake up the fd receiver thread */
-       int kconsumerd_should_quit[2];
-};
-
-/*
- * Initialise the necessary environnement:
- * - create a new context
- * - create the poll_pipe
- * - create the should_quit pipe (for signal handler)
- * - create the thread pipe (for splice)
- *
- * Takes the function pointers to the on_buffer_ready, on_recv_fd, and
- * on_update_fd callbacks.
- *
- * Returns a pointer to the new context or NULL on error.
- */
-extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
-               int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
-               int (*update_fd)(int sessiond_fd, uint32_t state));
-
-/*
- * Close all fds associated with the instance and free the context.
- */
-extern void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data *ctx);
-
-/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written.
- */
-extern int lttng_kconsumerd_on_read_subbuffer_mmap(
-               struct lttng_kconsumerd_local_data *ctx,
-               struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len);
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced.
- */
-extern int lttng_kconsumerd_on_read_subbuffer_splice(
-               struct lttng_kconsumerd_local_data *ctx,
-               struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len);
-
-/*
- * Take a snapshot for a specific fd
- *
- * Returns 0 on success, < 0 on error
- */
-int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx,
-        struct lttng_kconsumerd_fd *kconsumerd_fd);
-
-/*
- * Get the produced position
- *
- * Returns 0 on success, < 0 on error
- */
-int lttng_kconsumerd_get_produced_snapshot(
-        struct lttng_kconsumerd_local_data *ctx,
-        struct lttng_kconsumerd_fd *kconsumerd_fd,
-        unsigned long *pos);
-
-/*
- * Send return code to session daemon.
- *
- * Returns the return code of sendmsg : the number of bytes transmitted or -1
- * on error.
- */
-extern int lttng_kconsumerd_send_error(
-               struct lttng_kconsumerd_local_data *ctx, int cmd);
-
-/*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket.
- */
-extern int lttng_kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll);
-
-/*
- * This thread polls the fds in the ltt_fd_list to consume the data and write
- * it to tracefile if necessary.
- */
-extern void *lttng_kconsumerd_thread_poll_fds(void *data);
-
-/*
- * This thread listens on the consumerd socket and receives the file
- * descriptors from ltt-sessiond.
- */
-extern void *lttng_kconsumerd_thread_receive_fds(void *data);
-
-/*
- * Called from signal handler to ensure a clean exit.
- */
-extern void lttng_kconsumerd_should_exit(
-               struct lttng_kconsumerd_local_data *ctx);
-
-/*
- * Cleanup the daemon's socket on exit.
- */
-extern void lttng_kconsumerd_cleanup(void);
-
-/*
- * Set the error socket for communication with a session daemon.
- */
-extern void lttng_kconsumerd_set_error_sock(
-               struct lttng_kconsumerd_local_data *ctx, int sock);
-
-/*
- * Set the command socket path for communication with a session daemon.
- */
-extern void lttng_kconsumerd_set_command_sock_path(
-               struct lttng_kconsumerd_local_data *ctx, char *sock);
-
-#endif /* _LTTNG_KCONSUMERD_H */
diff --git a/include/lttng/lttng-ustconsumer.h b/include/lttng/lttng-ustconsumer.h
new file mode 100644 (file)
index 0000000..0d77a89
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _LTTNG_USTCONSUMER_H
+#define _LTTNG_USTCONSUMER_H
+
+#include <config.h>
+#include <lttng/lttng-consumer.h>
+#include <errno.h>
+
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile.
+ *
+ * Returns the number of bytes written.
+ */
+extern int lttng_ustconsumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len);
+
+/* Not implemented */
+extern int lttng_ustconsumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len);
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+        struct lttng_consumer_stream *stream);
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_get_produced_snapshot(
+        struct lttng_consumer_local_data *ctx,
+        struct lttng_consumer_stream *stream,
+        unsigned long *pos);
+
+int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll);
+
+extern int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan);
+extern void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan);
+extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream);
+extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream);
+
+
+#else /* CONFIG_LTTNG_TOOLS_HAVE_UST */
+
+static inline
+int lttng_ustconsumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       return -ENOSYS;
+}
+
+static inline
+int lttng_ustconsumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *uststream, unsigned long len)
+{
+       return -ENOSYS;
+}
+
+static inline
+int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+        struct lttng_consumer_stream *stream)
+{
+       return -ENOSYS;
+}
+
+static inline
+int lttng_ustconsumer_get_produced_snapshot(
+        struct lttng_consumer_local_data *ctx,
+        struct lttng_consumer_stream *stream,
+        unsigned long *pos)
+{
+       return -ENOSYS;
+}
+
+static inline
+int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll);
+
+static inline
+int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
+{
+       return -ENOSYS;
+}
+
+static inline
+void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
+{
+}
+
+static inline
+int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
+{
+       return -ENOSYS;
+}
+
+static inline
+void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
+{
+}
+
+#endif /* CONFIG_LTTNG_TOOLS_HAVE_UST */
+
+#endif /* _LTTNG_USTCONSUMER_H */
index 84f814793cf74352a992abc195b227620267df01..62b3ca7c8608d09e9a46745e3bec981edf594696 100644 (file)
@@ -300,7 +300,7 @@ extern const char *lttng_get_readable_code(int code);
  * domain. No consumer will be spawned and all fds/commands will go through the
  * socket path given (socket_path).
  *
- * NOTE: At the moment, if you use the liblttngkconsumerd, you can only use the
+ * NOTE: At the moment, if you use the liblttng-kconsumer, you can only use the
  * command socket. The error socket is not supported yet for roaming consumers.
  */
 extern int lttng_register_consumer(struct lttng_handle *handle,
diff --git a/liblttng-consumer/Makefile.am b/liblttng-consumer/Makefile.am
new file mode 100644 (file)
index 0000000..e3aa02a
--- /dev/null
@@ -0,0 +1,15 @@
+AM_CPPFLAGS = -I$(top_srcdir)/include
+
+lib_LTLIBRARIES = liblttng-consumer.la
+
+liblttng_consumer_la_SOURCES = lttng-consumer.c
+
+liblttng_consumer_la_LIBADD = \
+               $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \
+               $(top_builddir)/liblttng-kconsumer/liblttng-kconsumer.la
+
+
+if LTTNG_TOOLS_HAVE_UST
+liblttng_consumer_la_LIBADD += \
+               $(top_builddir)/liblttng-ustconsumer/liblttng-ustconsumer.la
+endif
diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c
new file mode 100644 (file)
index 0000000..f031d5a
--- /dev/null
@@ -0,0 +1,1002 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <lttng-kernel-ctl.h>
+#include <lttng-sessiond-comm.h>
+#include <lttng/lttng-consumer.h>
+#include <lttng/lttng-kconsumer.h>
+#include <lttng/lttng-ustconsumer.h>
+#include <lttngerr.h>
+
+struct lttng_consumer_global_data consumer_data = {
+       .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
+       .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
+       .stream_count = 0,
+       .need_update = 1,
+       .type = LTTNG_CONSUMER_UNKNOWN,
+};
+
+/* timeout parameter, to control the polling thread grace period. */
+int consumer_poll_timeout = -1;
+
+/*
+ * Flag to inform the polling thread to quit when all fd hung up. Updated by
+ * the consumer_thread_receive_fds when it notices that all fds has hung up.
+ * Also updated by the signal handler (consumer_should_exit()). Read by the
+ * polling threads.
+ */
+volatile int consumer_quit = 0;
+
+/*
+ * Find a stream. The consumer_data.lock must be locked during this
+ * call.
+ */
+static struct lttng_consumer_stream *consumer_find_stream(int key)
+{
+       struct lttng_consumer_stream *iter;
+
+       cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
+               if (iter->key == key) {
+                       DBG("Found stream key %d", key);
+                       return iter;
+               }
+       }
+       return NULL;
+}
+
+static struct lttng_consumer_channel *consumer_find_channel(int key)
+{
+       struct lttng_consumer_channel *iter;
+
+       cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
+               if (iter->key == key) {
+                       DBG("Found channel key %d", key);
+                       return iter;
+               }
+       }
+       return NULL;
+}
+
+/*
+ * Remove a stream from the global list protected by a mutex. This
+ * function is also responsible for freeing its data structures.
+ */
+void consumer_del_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       struct lttng_consumer_channel *free_chan = NULL;
+
+       pthread_mutex_lock(&consumer_data.lock);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               if (stream->mmap_base != NULL) {
+                       ret = munmap(stream->mmap_base, stream->mmap_len);
+                       if (ret != 0) {
+                               perror("munmap");
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER_UST:
+               lttng_ustconsumer_del_stream(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               goto end;
+       }
+
+       cds_list_del(&stream->list);
+       if (consumer_data.stream_count <= 0) {
+               goto end;
+       }
+       consumer_data.stream_count--;
+       if (!stream) {
+               goto end;
+       }
+       if (stream->out_fd >= 0) {
+               close(stream->out_fd);
+       }
+       if (stream->wait_fd >= 0) {
+               close(stream->wait_fd);
+       }
+       if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
+               close(stream->shm_fd);
+       }
+       if (!--stream->chan->refcount)
+               free_chan = stream->chan;
+       free(stream);
+end:
+       consumer_data.need_update = 1;
+       pthread_mutex_unlock(&consumer_data.lock);
+
+       if (free_chan)
+               consumer_del_channel(free_chan);
+}
+
+struct lttng_consumer_stream *consumer_allocate_stream(
+               int channel_key, int stream_key,
+               int shm_fd, int wait_fd,
+               enum lttng_consumer_stream_state state,
+               uint64_t mmap_len,
+               enum lttng_event_output output,
+               const char *path_name)
+{
+       struct lttng_consumer_stream *stream;
+       int ret;
+
+       stream = malloc(sizeof(*stream));
+       if (stream == NULL) {
+               perror("malloc struct lttng_consumer_stream");
+               goto end;
+       }
+       stream->chan = consumer_find_channel(channel_key);
+       if (!stream->chan) {
+               perror("Unable to find channel key");
+               goto end;
+       }
+       stream->chan->refcount++;
+       stream->key = stream_key;
+       stream->shm_fd = shm_fd;
+       stream->wait_fd = wait_fd;
+       stream->out_fd = -1;
+       stream->out_fd_offset = 0;
+       stream->state = state;
+       stream->mmap_len = mmap_len;
+       stream->mmap_base = NULL;
+       stream->output = output;
+       strncpy(stream->path_name, path_name, PATH_MAX - 1);
+       stream->path_name[PATH_MAX - 1] = '\0';
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER_UST:
+               ret = lttng_ustconsumer_allocate_stream(stream);
+               if (ret) {
+                       free(stream);
+                       return NULL;
+               }
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               goto end;
+       }
+       DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
+                       stream->path_name, stream->key,
+                       stream->shm_fd,
+                       stream->wait_fd,
+                       (unsigned long long) stream->mmap_len,
+                       stream->out_fd);
+end:
+       return stream;
+}
+
+/*
+ * Add a stream to the global list protected by a mutex.
+ */
+int consumer_add_stream(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       pthread_mutex_lock(&consumer_data.lock);
+       /* Check if already exist */
+       if (consumer_find_stream(stream->key)) {
+               ret = -1;
+               goto end;
+       }
+       cds_list_add(&stream->list, &consumer_data.stream_list.head);
+       consumer_data.stream_count++;
+       consumer_data.need_update = 1;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER_UST:
+               /* Streams are in CPU number order (we rely on this) */
+               stream->cpu = stream->chan->nr_streams++;
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               goto end;
+       }
+
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
+       return ret;
+}
+
+/*
+ * Update a stream according to what we just received.
+ */
+void consumer_change_stream_state(int stream_key,
+               enum lttng_consumer_stream_state state)
+{
+       struct lttng_consumer_stream *stream;
+
+       pthread_mutex_lock(&consumer_data.lock);
+       stream = consumer_find_stream(stream_key);
+       if (stream) {
+               stream->state = state;
+       }
+       consumer_data.need_update = 1;
+       pthread_mutex_unlock(&consumer_data.lock);
+}
+
+/*
+ * Remove a channel from the global list protected by a mutex. This
+ * function is also responsible for freeing its data structures.
+ */
+void consumer_del_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       pthread_mutex_lock(&consumer_data.lock);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER_UST:
+               lttng_ustconsumer_del_channel(channel);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               goto end;
+       }
+
+       cds_list_del(&channel->list);
+       if (channel->mmap_base != NULL) {
+               ret = munmap(channel->mmap_base, channel->mmap_len);
+               if (ret != 0) {
+                       perror("munmap");
+               }
+       }
+       if (channel->wait_fd >= 0) {
+               close(channel->wait_fd);
+       }
+       if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
+               close(channel->shm_fd);
+       }
+       free(channel);
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
+}
+
+struct lttng_consumer_channel *consumer_allocate_channel(
+               int channel_key,
+               int shm_fd, int wait_fd,
+               uint64_t mmap_len,
+               uint64_t max_sb_size)
+{
+       struct lttng_consumer_channel *channel;
+       int ret;
+
+       channel = malloc(sizeof(*channel));
+       if (channel == NULL) {
+               perror("malloc struct lttng_consumer_channel");
+               goto end;
+       }
+       channel->key = channel_key;
+       channel->shm_fd = shm_fd;
+       channel->wait_fd = wait_fd;
+       channel->mmap_len = mmap_len;
+       channel->max_sb_size = max_sb_size;
+       channel->refcount = 0;
+       channel->nr_streams = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               channel->mmap_base = NULL;
+               channel->mmap_len = 0;
+               break;
+       case LTTNG_CONSUMER_UST:
+               ret = lttng_ustconsumer_allocate_channel(channel);
+               if (ret) {
+                       free(channel);
+                       return NULL;
+               }
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               goto end;
+       }
+       DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
+                       channel->key,
+                       channel->shm_fd,
+                       channel->wait_fd,
+                       (unsigned long long) channel->mmap_len,
+                       (unsigned long long) channel->max_sb_size);
+end:
+       return channel;
+}
+
+/*
+ * Add a channel to the global list protected by a mutex.
+ */
+int consumer_add_channel(struct lttng_consumer_channel *channel)
+{
+       int ret = 0;
+
+       pthread_mutex_lock(&consumer_data.lock);
+       /* Check if already exist */
+       if (consumer_find_channel(channel->key)) {
+               ret = -1;
+               goto end;
+       }
+       cds_list_add(&channel->list, &consumer_data.channel_list.head);
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
+       return ret;
+}
+
+/*
+ * Allocate the pollfd structure and the local view of the out fds to avoid
+ * doing a lookup in the linked list and concurrency issues when writing is
+ * needed. Called with consumer_data.lock held.
+ *
+ * Returns the number of fds in the structures.
+ */
+int consumer_update_poll_array(
+               struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
+               struct lttng_consumer_stream **local_stream)
+{
+       struct lttng_consumer_stream *iter;
+       int i = 0;
+
+       DBG("Updating poll fd array");
+       cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
+               if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+                       continue;
+               }
+               DBG("Active FD %d", iter->wait_fd);
+               (*pollfd)[i].fd = iter->wait_fd;
+               (*pollfd)[i].events = POLLIN | POLLPRI;
+               local_stream[i] = iter;
+               i++;
+       }
+
+       /*
+        * Insert the consumer_poll_pipe at the end of the array and don't
+        * increment i so nb_fd is the number of real FD.
+        */
+       (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+       (*pollfd)[i].events = POLLIN;
+       return i;
+}
+
+/*
+ * Poll on the should_quit pipe and the command socket return -1 on error and
+ * should exit, 0 if data is available on the command socket
+ */
+int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
+{
+       int num_rdy;
+
+       num_rdy = poll(consumer_sockpoll, 2, -1);
+       if (num_rdy == -1) {
+               perror("Poll error");
+               goto exit;
+       }
+       if (consumer_sockpoll[0].revents == POLLIN) {
+               DBG("consumer_should_quit wake up");
+               goto exit;
+       }
+       return 0;
+
+exit:
+       return -1;
+}
+
+/*
+ * Set the error socket.
+ */
+void lttng_consumer_set_error_sock(
+               struct lttng_consumer_local_data *ctx, int sock)
+{
+       ctx->consumer_error_socket = sock;
+}
+
+/*
+ * Set the command socket path.
+ */
+
+void lttng_consumer_set_command_sock_path(
+               struct lttng_consumer_local_data *ctx, char *sock)
+{
+       ctx->consumer_command_sock_path = sock;
+}
+
+/*
+ * Send return code to the session daemon.
+ * If the socket is not defined, we return 0, it is not a fatal error
+ */
+int lttng_consumer_send_error(
+               struct lttng_consumer_local_data *ctx, int cmd)
+{
+       if (ctx->consumer_error_socket > 0) {
+               return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
+                               sizeof(enum lttcomm_sessiond_command));
+       }
+
+       return 0;
+}
+
+/*
+ * Close all the tracefiles and stream fds, should be called when all instances
+ * are destroyed.
+ */
+void lttng_consumer_cleanup(void)
+{
+       struct lttng_consumer_stream *iter, *tmp;
+       struct lttng_consumer_channel *citer, *ctmp;
+
+       /*
+        * close all outfd. Called when there are no more threads
+        * running (after joining on the threads), no need to protect
+        * list iteration with mutex.
+        */
+       cds_list_for_each_entry_safe(iter, tmp,
+                       &consumer_data.stream_list.head, list) {
+               consumer_del_stream(iter);
+       }
+       cds_list_for_each_entry_safe(citer, ctmp,
+                       &consumer_data.channel_list.head, list) {
+               consumer_del_channel(citer);
+       }
+}
+
+/*
+ * Called from signal handler.
+ */
+void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       consumer_quit = 1;
+       ret = write(ctx->consumer_should_quit[1], "4", 1);
+       if (ret < 0) {
+               perror("write consumer quit");
+       }
+}
+
+void lttng_consumer_sync_trace_file(
+               struct lttng_consumer_stream *stream, off_t orig_offset)
+{
+       int outfd = stream->out_fd;
+
+       /*
+        * This does a blocking write-and-wait on any page that belongs to the
+        * subbuffer prior to the one we just wrote.
+        * Don't care about error values, as these are just hints and ways to
+        * limit the amount of page cache used.
+        */
+       if (orig_offset < stream->chan->max_sb_size) {
+               return;
+       }
+       sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
+                       stream->chan->max_sb_size,
+                       SYNC_FILE_RANGE_WAIT_BEFORE
+                       | SYNC_FILE_RANGE_WRITE
+                       | SYNC_FILE_RANGE_WAIT_AFTER);
+       /*
+        * Give hints to the kernel about how we access the file:
+        * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
+        * we write it.
+        *
+        * We need to call fadvise again after the file grows because the
+        * kernel does not seem to apply fadvise to non-existing parts of the
+        * file.
+        *
+        * Call fadvise _after_ having waited for the page writeback to
+        * complete because the dirty page writeback semantic is not well
+        * defined. So it can be expected to lead to lower throughput in
+        * streaming.
+        */
+       posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
+                       stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
+}
+
+/*
+ * Initialise the necessary environnement :
+ * - create a new context
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
+ * - create the thread pipe (for splice)
+ *
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ *
+ * Returns a pointer to the new context or NULL on error.
+ */
+struct lttng_consumer_local_data *lttng_consumer_create(
+               enum lttng_consumer_type type,
+               int (*buffer_ready)(struct lttng_consumer_stream *stream),
+               int (*recv_channel)(struct lttng_consumer_channel *channel),
+               int (*recv_stream)(struct lttng_consumer_stream *stream),
+               int (*update_stream)(int stream_key, uint32_t state))
+{
+       int ret, i;
+       struct lttng_consumer_local_data *ctx;
+
+       assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
+               consumer_data.type == type);
+       consumer_data.type = type;
+
+       ctx = malloc(sizeof(struct lttng_consumer_local_data));
+       if (ctx == NULL) {
+               perror("allocating context");
+               goto error;
+       }
+
+       ctx->consumer_error_socket = -1;
+       /* assign the callbacks */
+       ctx->on_buffer_ready = buffer_ready;
+       ctx->on_recv_channel = recv_channel;
+       ctx->on_recv_stream = recv_stream;
+       ctx->on_update_stream = update_stream;
+
+       ret = pipe(ctx->consumer_poll_pipe);
+       if (ret < 0) {
+               perror("Error creating poll pipe");
+               goto error_poll_pipe;
+       }
+
+       ret = pipe(ctx->consumer_should_quit);
+       if (ret < 0) {
+               perror("Error creating recv pipe");
+               goto error_quit_pipe;
+       }
+
+       ret = pipe(ctx->consumer_thread_pipe);
+       if (ret < 0) {
+               perror("Error creating thread pipe");
+               goto error_thread_pipe;
+       }
+
+       return ctx;
+
+
+error_thread_pipe:
+       for (i = 0; i < 2; i++) {
+               int err;
+
+               err = close(ctx->consumer_should_quit[i]);
+               assert(!err);
+       }
+error_quit_pipe:
+       for (i = 0; i < 2; i++) {
+               int err;
+
+               err = close(ctx->consumer_poll_pipe[i]);
+               assert(!err);
+       }
+error_poll_pipe:
+       free(ctx);
+error:
+       return NULL;
+}
+
+/*
+ * Close all fds associated with the instance and free the context.
+ */
+void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
+{
+       close(ctx->consumer_error_socket);
+       close(ctx->consumer_thread_pipe[0]);
+       close(ctx->consumer_thread_pipe[1]);
+       close(ctx->consumer_poll_pipe[0]);
+       close(ctx->consumer_poll_pipe[1]);
+       close(ctx->consumer_should_quit[0]);
+       close(ctx->consumer_should_quit[1]);
+       unlink(ctx->consumer_command_sock_path);
+       free(ctx);
+}
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile.
+ *
+ * Returns the number of bytes written
+ */
+int lttng_consumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+       }
+}
+
+/*
+ * Splice the data from the ring buffer to the tracefile.
+ *
+ * Returns the number of bytes spliced.
+ */
+int lttng_consumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
+       case LTTNG_CONSUMER_UST:
+               return -ENOSYS;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+
+}
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_take_snapshot(ctx, stream);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_take_snapshot(ctx, stream);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_produced_snapshot(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream,
+               unsigned long *pos)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
+int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
+/*
+ * This thread polls the fds in the ltt_fd_list to consume the data and write
+ * it to tracefile if necessary.
+ */
+void *lttng_consumer_thread_poll_fds(void *data)
+{
+       int num_rdy, num_hup, high_prio, ret, i;
+       struct pollfd *pollfd = NULL;
+       /* local view of the streams */
+       struct lttng_consumer_stream **local_stream = NULL;
+       /* local view of consumer_data.fds_count */
+       int nb_fd = 0;
+       char tmp;
+       int tmp2;
+       struct lttng_consumer_local_data *ctx = data;
+
+       local_stream = malloc(sizeof(struct lttng_consumer_stream));
+
+       while (1) {
+               high_prio = 0;
+               num_hup = 0;
+
+               /*
+                * the ltt_fd_list has been updated, we need to update our
+                * local array as well
+                */
+               pthread_mutex_lock(&consumer_data.lock);
+               if (consumer_data.need_update) {
+                       if (pollfd != NULL) {
+                               free(pollfd);
+                               pollfd = NULL;
+                       }
+                       if (local_stream != NULL) {
+                               free(local_stream);
+                               local_stream = NULL;
+                       }
+
+                       /* allocate for all fds + 1 for the consumer_poll_pipe */
+                       pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       if (pollfd == NULL) {
+                               perror("pollfd malloc");
+                               pthread_mutex_unlock(&consumer_data.lock);
+                               goto end;
+                       }
+
+                       /* allocate for all fds + 1 for the consumer_poll_pipe */
+                       local_stream = malloc((consumer_data.stream_count + 1) *
+                                       sizeof(struct lttng_consumer_stream));
+                       if (local_stream == NULL) {
+                               perror("local_stream malloc");
+                               pthread_mutex_unlock(&consumer_data.lock);
+                               goto end;
+                       }
+                       ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+                       if (ret < 0) {
+                               ERR("Error in allocating pollfd or local_outfds");
+                               lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
+                               pthread_mutex_unlock(&consumer_data.lock);
+                               goto end;
+                       }
+                       nb_fd = ret;
+                       consumer_data.need_update = 0;
+               }
+               pthread_mutex_unlock(&consumer_data.lock);
+
+               /* poll on the array of fds */
+               DBG("polling on %d fd", nb_fd + 1);
+               num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+               DBG("poll num_rdy : %d", num_rdy);
+               if (num_rdy == -1) {
+                       perror("Poll error");
+                       lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
+                       goto end;
+               } else if (num_rdy == 0) {
+                       DBG("Polling thread timed out");
+                       goto end;
+               }
+
+               /* No FDs and consumer_quit, kconsumer_cleanup the thread */
+               if (nb_fd == 0 && consumer_quit == 1) {
+                       goto end;
+               }
+
+               /*
+                * If the consumer_poll_pipe triggered poll go
+                * directly to the beginning of the loop to update the
+                * array. We want to prioritize array update over
+                * low-priority reads.
+                */
+               if (pollfd[nb_fd].revents == POLLIN) {
+                       DBG("consumer_poll_pipe wake up");
+                       tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+                       if (tmp2 < 0) {
+                               perror("read kconsumer poll");
+                       }
+                       continue;
+               }
+
+               /* Take care of high priority channels first. */
+               for (i = 0; i < nb_fd; i++) {
+                       switch(pollfd[i].revents) {
+                       case POLLERR:
+                               ERR("Error returned in polling fd %d.", pollfd[i].fd);
+                               consumer_del_stream(local_stream[i]);
+                               num_hup++;
+                               break;
+                       case POLLHUP:
+                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               consumer_del_stream(local_stream[i]);
+                               num_hup++;
+                               break;
+                       case POLLNVAL:
+                               ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
+                               consumer_del_stream(local_stream[i]);
+                               num_hup++;
+                               break;
+                       case POLLPRI:
+                               DBG("Urgent read on fd %d", pollfd[i].fd);
+                               high_prio = 1;
+                               ret = ctx->on_buffer_ready(local_stream[i]);
+                               /* it's ok to have an unavailable sub-buffer */
+                               if (ret == EAGAIN) {
+                                       ret = 0;
+                               }
+                               break;
+                       }
+               }
+
+               /* If every buffer FD has hung up, we end the read loop here */
+               if (nb_fd > 0 && num_hup == nb_fd) {
+                       DBG("every buffer FD has hung up\n");
+                       if (consumer_quit == 1) {
+                               goto end;
+                       }
+                       continue;
+               }
+
+               /* Take care of low priority channels. */
+               if (high_prio == 0) {
+                       for (i = 0; i < nb_fd; i++) {
+                               if (pollfd[i].revents == POLLIN) {
+                                       DBG("Normal read on fd %d", pollfd[i].fd);
+                                       ret = ctx->on_buffer_ready(local_stream[i]);
+                                       /* it's ok to have an unavailable subbuffer */
+                                       if (ret == EAGAIN) {
+                                               ret = 0;
+                                       }
+                               }
+                       }
+               }
+       }
+end:
+       DBG("polling thread exiting");
+       if (pollfd != NULL) {
+               free(pollfd);
+               pollfd = NULL;
+       }
+       if (local_stream != NULL) {
+               free(local_stream);
+               local_stream = NULL;
+       }
+       return NULL;
+}
+
+/*
+ * This thread listens on the consumerd socket and receives the file
+ * descriptors from the session daemon.
+ */
+void *lttng_consumer_thread_receive_fds(void *data)
+{
+       int sock, client_socket, ret;
+       /*
+        * structure to poll for incoming data on communication socket avoids
+        * making blocking sockets.
+        */
+       struct pollfd consumer_sockpoll[2];
+       struct lttng_consumer_local_data *ctx = data;
+
+       DBG("Creating command socket %s", ctx->consumer_command_sock_path);
+       unlink(ctx->consumer_command_sock_path);
+       client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
+       if (client_socket < 0) {
+               ERR("Cannot create command socket");
+               goto end;
+       }
+
+       ret = lttcomm_listen_unix_sock(client_socket);
+       if (ret < 0) {
+               goto end;
+       }
+
+       DBG("Sending ready command to ltt-sessiond");
+       ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
+       /* return < 0 on error, but == 0 is not fatal */
+       if (ret < 0) {
+               ERR("Error sending ready command to ltt-sessiond");
+               goto end;
+       }
+
+       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto end;
+       }
+
+       /* prepare the FDs to poll : to client socket and the should_quit pipe */
+       consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
+       consumer_sockpoll[0].events = POLLIN | POLLPRI;
+       consumer_sockpoll[1].fd = client_socket;
+       consumer_sockpoll[1].events = POLLIN | POLLPRI;
+
+       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               goto end;
+       }
+       DBG("Connection on client_socket");
+
+       /* Blocking call, waiting for transmission */
+       sock = lttcomm_accept_unix_sock(client_socket);
+       if (sock <= 0) {
+               WARN("On accept");
+               goto end;
+       }
+       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto end;
+       }
+
+       /* update the polling structure to poll on the established socket */
+       consumer_sockpoll[1].fd = sock;
+       consumer_sockpoll[1].events = POLLIN | POLLPRI;
+
+       while (1) {
+               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       goto end;
+               }
+               DBG("Incoming command on sock");
+               ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
+               if (ret == -ENOENT) {
+                       DBG("Received STOP command");
+                       goto end;
+               }
+               if (ret < 0) {
+                       ERR("Communication interrupted on command socket");
+                       goto end;
+               }
+               if (consumer_quit) {
+                       DBG("consumer_thread_receive_fds received quit from signal");
+                       goto end;
+               }
+               DBG("received fds on sock");
+       }
+end:
+       DBG("consumer_thread_receive_fds exiting");
+
+       /*
+        * when all fds have hung up, the polling thread
+        * can exit cleanly
+        */
+       consumer_quit = 1;
+
+       /*
+        * 2s of grace period, if no polling events occur during
+        * this period, the polling thread will exit even if there
+        * are still open FDs (should not happen, but safety mechanism).
+        */
+       consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
+
+       /* wake up the polling thread */
+       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
+       if (ret < 0) {
+               perror("poll pipe write");
+       }
+       return NULL;
+}
diff --git a/liblttng-kconsumer/Makefile.am b/liblttng-kconsumer/Makefile.am
new file mode 100644 (file)
index 0000000..15021cd
--- /dev/null
@@ -0,0 +1,8 @@
+AM_CPPFLAGS = -I$(top_srcdir)/include
+
+noinst_LTLIBRARIES = liblttng-kconsumer.la
+
+liblttng_kconsumer_la_SOURCES = lttng-kconsumer.c
+
+liblttng_kconsumer_la_LIBADD = \
+               $(top_builddir)/libkernelctl/libkernelctl.la
diff --git a/liblttng-kconsumer/lttng-kconsumer.c b/liblttng-kconsumer/lttng-kconsumer.c
new file mode 100644 (file)
index 0000000..c4ebb01
--- /dev/null
@@ -0,0 +1,305 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <lttng-kernel-ctl.h>
+#include <lttng-sessiond-comm.h>
+#include <lttng/lttng-kconsumer.h>
+#include <lttngerr.h>
+
+extern struct lttng_consumer_global_data consumer_data;
+extern int consumer_poll_timeout;
+extern volatile int consumer_quit;
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile.
+ *
+ * Returns the number of bytes written
+ */
+int lttng_kconsumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       unsigned long mmap_offset;
+       long ret = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       int fd = stream->wait_fd;
+       int outfd = stream->out_fd;
+
+       /* get the offset inside the fd to mmap */
+       ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
+       if (ret != 0) {
+               ret = -errno;
+               perror("kernctl_get_mmap_read_offset");
+               goto end;
+       }
+
+       while (len > 0) {
+               ret = write(outfd, stream->mmap_base + mmap_offset, len);
+               if (ret >= len) {
+                       len = 0;
+               } else if (ret < 0) {
+                       ret = -errno;
+                       perror("Error in file write");
+                       goto end;
+               }
+               /* This won't block, but will start writeout asynchronously */
+               sync_file_range(outfd, stream->out_fd_offset, ret,
+                               SYNC_FILE_RANGE_WRITE);
+               stream->out_fd_offset += ret;
+       }
+
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+       goto end;
+
+end:
+       return ret;
+}
+
+/*
+ * Splice the data from the ring buffer to the tracefile.
+ *
+ * Returns the number of bytes spliced.
+ */
+int lttng_kconsumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       long ret = 0;
+       loff_t offset = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       int fd = stream->wait_fd;
+       int outfd = stream->out_fd;
+
+       while (len > 0) {
+               DBG("splice chan to pipe offset %lu (fd : %d)",
+                               (unsigned long)offset, fd);
+               ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
+                               SPLICE_F_MOVE | SPLICE_F_MORE);
+               DBG("splice chan to pipe ret %ld", ret);
+               if (ret < 0) {
+                       ret = errno;
+                       perror("Error in relay splice");
+                       goto splice_error;
+               }
+
+               ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret,
+                               SPLICE_F_MOVE | SPLICE_F_MORE);
+               DBG("splice pipe to file %ld", ret);
+               if (ret < 0) {
+                       ret = errno;
+                       perror("Error in file splice");
+                       goto splice_error;
+               }
+               len -= ret;
+               /* This won't block, but will start writeout asynchronously */
+               sync_file_range(outfd, stream->out_fd_offset, ret,
+                               SYNC_FILE_RANGE_WRITE);
+               stream->out_fd_offset += ret;
+       }
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+       goto end;
+
+splice_error:
+       /* send the appropriate error description to sessiond */
+       switch(ret) {
+       case EBADF:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
+               break;
+       case EINVAL:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
+               break;
+       case ENOMEM:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
+               break;
+       case ESPIPE:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
+               break;
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+       int infd = stream->wait_fd;
+
+       ret = kernctl_snapshot(infd);
+       if (ret != 0) {
+               ret = errno;
+               perror("Getting sub-buffer snapshot.");
+       }
+
+       return ret;
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_produced_snapshot(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream,
+               unsigned long *pos)
+{
+       int ret;
+       int infd = stream->wait_fd;
+
+       ret = kernctl_snapshot_get_produced(infd, pos);
+       if (ret != 0) {
+               ret = errno;
+               perror("kernctl_snapshot_get_produced");
+       }
+
+       return ret;
+}
+
+int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll)
+{
+       ssize_t ret;
+       struct lttcomm_consumer_msg msg;
+
+       ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
+       if (ret != sizeof(msg)) {
+               lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+               return ret;
+       }
+       if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+               return -ENOENT;
+       }
+
+       switch (msg.cmd_type) {
+       case LTTNG_CONSUMER_ADD_CHANNEL:
+       {
+               struct lttng_consumer_channel *new_channel;
+
+               DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+               new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
+                               -1, -1,
+                               msg.u.channel.mmap_len,
+                               msg.u.channel.max_sb_size);
+               if (new_channel == NULL) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+                       goto end_nosignal;
+               }
+               if (ctx->on_recv_channel != NULL) {
+                       ret = ctx->on_recv_channel(new_channel);
+                       if (ret == 0) {
+                               consumer_add_channel(new_channel);
+                       } else if (ret < 0) {
+                               goto end_nosignal;
+                       }
+               } else {
+                       consumer_add_channel(new_channel);
+               }
+               goto end_nosignal;
+       }
+       case LTTNG_CONSUMER_ADD_STREAM:
+       {
+               struct lttng_consumer_stream *new_stream;
+               int fds[1];
+               size_t nb_fd = 1;
+
+               /* block */
+               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       return -EINTR;
+               }
+               ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
+               if (ret != sizeof(fds)) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+                       return ret;
+               }
+               if (nb_fd < 2)
+                       fds[1] = fds[0];        /* duplicate same fd if recv only one */
+
+               DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
+                       fds[0], fds[1]);
+               new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
+                               msg.u.stream.stream_key,
+                               fds[0], fds[1],
+                               msg.u.stream.state,
+                               msg.u.stream.mmap_len,
+                               msg.u.stream.output,
+                               msg.u.stream.path_name);
+               if (new_stream == NULL) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+                       goto end;
+               }
+               if (ctx->on_recv_stream != NULL) {
+                       ret = ctx->on_recv_stream(new_stream);
+                       if (ret == 0) {
+                               consumer_add_stream(new_stream);
+                       } else if (ret < 0) {
+                               goto end;
+                       }
+               } else {
+                       consumer_add_stream(new_stream);
+               }
+               break;
+       }
+       case LTTNG_CONSUMER_UPDATE_STREAM:
+       {
+               if (ctx->on_update_stream != NULL) {
+                       ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
+                       if (ret == 0) {
+                               consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
+                       } else if (ret < 0) {
+                               goto end;
+                       }
+               } else {
+                       consumer_change_stream_state(msg.u.stream.stream_key,
+                               msg.u.stream.state);
+               }
+               break;
+       }
+       default:
+               break;
+       }
+end:
+       /* signal the poll thread */
+       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
+       if (ret < 0) {
+               perror("write consumer poll");
+       }
+end_nosignal:
+       return 0;
+}
index be82cc1ec8c4f7cc1d353a55b8454e91b83ed2df..4bb6ed5783488cf2293d833339cd13de9eafda1f 100644 (file)
@@ -27,6 +27,7 @@
 #include <sys/types.h>
 #include <sys/un.h>
 #include <unistd.h>
+#include <errno.h>
 
 #include <lttng-sessiond-comm.h>
 
@@ -75,19 +76,19 @@ static const char *lttcomm_readable_code[] = {
        [ LTTCOMM_ERR_INDEX(LTTCOMM_UST_SESS_FAIL) ] = "UST create session failed",
        [ LTTCOMM_ERR_INDEX(LTTCOMM_UST_CHAN_NOT_FOUND) ] = "UST channel not found",
        [ LTTCOMM_ERR_INDEX(LTTCOMM_UST_CHAN_FAIL) ] = "UST create channel failed",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_COMMAND_SOCK_READY) ] = "Kconsumerd command socket ready",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_SUCCESS_RECV_FD) ] = "Kconsumerd success on receiving fds",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_ERROR_RECV_FD) ] = "Kconsumerd error on receiving fds",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_ERROR) ] = "Kconsumerd error in polling thread",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_NVAL) ] = "Kconsumerd polling on closed fd",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_HUP) ] = "Kconsumerd all fd hung up",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_EXIT_SUCCESS) ] = "Kconsumerd exiting normally",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_EXIT_FAILURE) ] = "Kconsumerd exiting on error",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_OUTFD_ERROR) ] = "Kconsumerd error opening the tracefile",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_EBADF) ] = "Kconsumerd splice EBADF",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_EINVAL) ] = "Kconsumerd splice EINVAL",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_ENOMEM) ] = "Kconsumerd splice ENOMEM",
-       [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_ESPIPE) ] = "Kconsumerd splice ESPIPE",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_COMMAND_SOCK_READY) ] = "consumerd command socket ready",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_SUCCESS_RECV_FD) ] = "consumerd success on receiving fds",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_ERROR_RECV_FD) ] = "consumerd error on receiving fds",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_POLL_ERROR) ] = "consumerd error in polling thread",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_POLL_NVAL) ] = "consumerd polling on closed fd",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_POLL_HUP) ] = "consumerd all fd hung up",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_EXIT_SUCCESS) ] = "consumerd exiting normally",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_EXIT_FAILURE) ] = "consumerd exiting on error",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_OUTFD_ERROR) ] = "consumerd error opening the tracefile",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_EBADF) ] = "consumerd splice EBADF",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_EINVAL) ] = "consumerd splice EINVAL",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_ENOMEM) ] = "consumerd splice ENOMEM",
+       [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_ESPIPE) ] = "consumerd splice ESPIPE",
        [ LTTCOMM_ERR_INDEX(LTTCOMM_NO_EVENT) ] = "Event not found",
 };
 
@@ -285,10 +286,11 @@ int lttcomm_close_unix_sock(int sock)
 }
 
 /*
- * Send multiple fds on a unix socket.
+ * Send a message accompanied by fd(s) over a unix socket.
+ *
+ * Returns the size of data sent, or negative error value.
  */
-ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds,
-               size_t nb_fd, size_t len)
+ssize_t lttcomm_send_fds_unix_sock(int sock, int *fds, size_t nb_fd)
 {
        struct msghdr msg = { 0 };
        struct cmsghdr *cmptr;
@@ -296,11 +298,10 @@ ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds,
        ssize_t ret = -1;
        unsigned int sizeof_fds = nb_fd * sizeof(int);
        char tmp[CMSG_SPACE(sizeof_fds)];
+       char dummy = 0;
 
-       /*
-        * Note: the consumerd receiver only supports receiving one FD per message.
-        */
-       assert(nb_fd == 1);
+       if (nb_fd > LTTCOMM_MAX_SEND_FDS)
+               return -EINVAL;
 
        msg.msg_control = (caddr_t)tmp;
        msg.msg_controllen = CMSG_LEN(sizeof_fds);
@@ -313,8 +314,8 @@ ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds,
        /* Sum of the length of all control messages in the buffer: */
        msg.msg_controllen = cmptr->cmsg_len;
 
-       iov[0].iov_base = buf;
-       iov[0].iov_len = len;
+       iov[0].iov_base = &dummy;
+       iov[0].iov_len = 1;
        msg.msg_iov = iov;
        msg.msg_iovlen = 1;
 
@@ -322,31 +323,30 @@ ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds,
        if (ret < 0) {
                perror("sendmsg");
        }
-
        return ret;
 }
 
 /*
- * Receives a single fd from socket.
+ * Recv a message accompanied by fd(s) from a unix socket.
  *
- * Returns the size of received data
+ * Returns the size of received data, or negative error value.
+ *
+ * Expect at most "nb_fd" file descriptors. Returns the number of fd
+ * actually received in nb_fd.
  */
-ssize_t lttcomm_recv_fds_unix_sock(int sock, void *buf, int *fds,
-               size_t nb_fd, size_t len)
+ssize_t lttcomm_recv_fds_unix_sock(int sock, int *fds, size_t nb_fd)
 {
        struct iovec iov[1];
-       int data_fd, i, ret = 0;
+       ssize_t ret = 0;
        struct cmsghdr *cmsg;
-       char recv_fd[CMSG_SPACE(sizeof(int))];
+       size_t sizeof_fds = nb_fd * sizeof(int);
+       char recv_fd[CMSG_SPACE(sizeof_fds)];
        struct msghdr msg = { 0 };
-       union {
-               unsigned char vc[4];
-               int vi;
-       } tmp;
+       char dummy;
 
        /* Prepare to receive the structures */
-       iov[0].iov_base = &data_fd;
-       iov[0].iov_len = sizeof(data_fd);
+       iov[0].iov_base = &dummy;
+       iov[0].iov_len = 1;
        msg.msg_iov = iov;
        msg.msg_iovlen = 1;
        msg.msg_control = recv_fd;
@@ -357,33 +357,35 @@ ssize_t lttcomm_recv_fds_unix_sock(int sock, void *buf, int *fds,
                perror("recvmsg fds");
                goto end;
        }
-
-       if (ret != sizeof(data_fd)) {
-               fprintf(stderr, "Error: Received %d bytes, expected %ld",
-                               ret, sizeof(data_fd));
+       if (ret != 1) {
+               fprintf(stderr, "Error: Received %zd bytes, expected %d\n",
+                               ret, 1);
+               goto end;
+       }
+       if (msg.msg_flags & MSG_CTRUNC) {
+               fprintf(stderr, "Error: Control message truncated.\n");
+               ret = -1;
                goto end;
        }
-
        cmsg = CMSG_FIRSTHDR(&msg);
        if (!cmsg) {
-               fprintf(stderr, "Error: Invalid control message header");
+               fprintf(stderr, "Error: Invalid control message header\n");
                ret = -1;
                goto end;
        }
-
        if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) {
-               fprintf(stderr, "Didn't received any fd");
+               fprintf(stderr, "Didn't received any fd\n");
                ret = -1;
                goto end;
        }
-
-       /* this is our fd */
-       for (i = 0; i < sizeof(int); i++) {
-               tmp.vc[i] = CMSG_DATA(cmsg)[i];
+       if (cmsg->cmsg_len != CMSG_LEN(sizeof_fds)) {
+               fprintf(stderr, "Error: Received %zu bytes of ancillary data, expected %zu\n",
+                               cmsg->cmsg_len, CMSG_LEN(sizeof_fds));
+               ret = -1;
+               goto end;
        }
-
-       ret = tmp.vi;
-
+       memcpy(fds, CMSG_DATA(cmsg), sizeof_fds);
+       ret = sizeof_fds;
 end:
        return ret;
 }
diff --git a/liblttng-ustconsumer/Makefile.am b/liblttng-ustconsumer/Makefile.am
new file mode 100644 (file)
index 0000000..c181a47
--- /dev/null
@@ -0,0 +1,10 @@
+AM_CPPFLAGS = -I$(top_srcdir)/include
+
+if LTTNG_TOOLS_HAVE_UST
+noinst_LTLIBRARIES = liblttng-ustconsumer.la
+
+liblttng_ustconsumer_la_SOURCES = lttng-ustconsumer.c
+
+liblttng_ustconsumer_la_LIBADD = \
+               -lustctl
+endif
diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c
new file mode 100644 (file)
index 0000000..1e0bf55
--- /dev/null
@@ -0,0 +1,307 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <ust/lttng-ust-ctl.h>
+#include <lttng-sessiond-comm.h>
+#include <lttng/lttng-ustconsumer.h>
+#include <lttngerr.h>
+
+extern struct lttng_consumer_global_data consumer_data;
+extern int consumer_poll_timeout;
+extern volatile int consumer_quit;
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile.
+ *
+ * Returns the number of bytes written
+ */
+int lttng_ustconsumer_on_read_subbuffer_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       unsigned long mmap_offset;
+       long ret = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       int outfd = stream->out_fd;
+
+       /* get the offset inside the fd to mmap */
+       ret = ustctl_get_mmap_read_offset(stream->chan->handle,
+               stream->buf, &mmap_offset);
+       if (ret != 0) {
+               ret = -errno;
+               perror("ustctl_get_mmap_read_offset");
+               goto end;
+       }
+       while (len > 0) {
+               ret = write(outfd, stream->mmap_base + mmap_offset, len);
+               if (ret >= len) {
+                       len = 0;
+               } else if (ret < 0) {
+                       ret = -errno;
+                       perror("Error in file write");
+                       goto end;
+               }
+               /* This won't block, but will start writeout asynchronously */
+               sync_file_range(outfd, stream->out_fd_offset, ret,
+                               SYNC_FILE_RANGE_WRITE);
+               stream->out_fd_offset += ret;
+       }
+
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+       goto end;
+
+end:
+       return ret;
+}
+
+/*
+ * Splice the data from the ring buffer to the tracefile.
+ *
+ * Returns the number of bytes spliced.
+ */
+int lttng_ustconsumer_on_read_subbuffer_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream, unsigned long len)
+{
+       return -ENOSYS;
+}
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       ret = ustctl_snapshot(stream->chan->handle, stream->buf);
+       if (ret != 0) {
+               ret = errno;
+               perror("Getting sub-buffer snapshot.");
+       }
+
+       return ret;
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_get_produced_snapshot(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream,
+               unsigned long *pos)
+{
+       int ret;
+
+       ret = ustctl_snapshot_get_produced(stream->chan->handle,
+                       stream->buf, pos);
+       if (ret != 0) {
+               ret = errno;
+               perror("kernctl_snapshot_get_produced");
+       }
+
+       return ret;
+}
+
+int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+               int sock, struct pollfd *consumer_sockpoll)
+{
+       ssize_t ret;
+       struct lttcomm_consumer_msg msg;
+
+       ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
+       if (ret != sizeof(msg)) {
+               lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+               return ret;
+       }
+       if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+               return -ENOENT;
+       }
+
+       switch (msg.cmd_type) {
+       case LTTNG_CONSUMER_ADD_CHANNEL:
+       {
+               struct lttng_consumer_channel *new_channel;
+               int fds[1];
+               size_t nb_fd = 1;
+
+               /* block */
+               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       return -EINTR;
+               }
+               ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
+               if (ret != sizeof(fds)) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+                       return ret;
+               }
+
+               DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+
+               new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
+                               fds[0], -1,
+                               msg.u.channel.mmap_len,
+                               msg.u.channel.max_sb_size);
+               if (new_channel == NULL) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+                       goto end_nosignal;
+               }
+               if (ctx->on_recv_channel != NULL) {
+                       ret = ctx->on_recv_channel(new_channel);
+                       if (ret == 0) {
+                               consumer_add_channel(new_channel);
+                       } else if (ret < 0) {
+                               goto end_nosignal;
+                       }
+               } else {
+                       consumer_add_channel(new_channel);
+               }
+               goto end_nosignal;
+       }
+       case LTTNG_CONSUMER_ADD_STREAM:
+       {
+               struct lttng_consumer_stream *new_stream;
+               int fds[2];
+               size_t nb_fd = 2;
+
+               /* block */
+               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       return -EINTR;
+               }
+               ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
+               if (ret != sizeof(fds)) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+                       return ret;
+               }
+
+               DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
+                       fds[0], fds[1]);
+               new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
+                               msg.u.stream.stream_key,
+                               fds[0], fds[1],
+                               msg.u.stream.state,
+                               msg.u.stream.mmap_len,
+                               msg.u.stream.output,
+                               msg.u.stream.path_name);
+               if (new_stream == NULL) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+                       goto end;
+               }
+               if (ctx->on_recv_stream != NULL) {
+                       ret = ctx->on_recv_stream(new_stream);
+                       if (ret == 0) {
+                               consumer_add_stream(new_stream);
+                       } else if (ret < 0) {
+                               goto end;
+                       }
+               } else {
+                       consumer_add_stream(new_stream);
+               }
+               break;
+       }
+       case LTTNG_CONSUMER_UPDATE_STREAM:
+       {
+               if (ctx->on_update_stream != NULL) {
+                       ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
+                       if (ret == 0) {
+                               consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
+                       } else if (ret < 0) {
+                               goto end;
+                       }
+               } else {
+                       consumer_change_stream_state(msg.u.stream.stream_key,
+                               msg.u.stream.state);
+               }
+               break;
+       }
+       default:
+               break;
+       }
+end:
+       /* signal the poll thread */
+       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
+       if (ret < 0) {
+               perror("write consumer poll");
+       }
+end_nosignal:
+       return 0;
+}
+
+int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
+{
+       struct object_data obj;
+
+       obj.handle = -1;
+       obj.shm_fd = chan->shm_fd;
+       obj.wait_fd = chan->wait_fd;
+       obj.memory_map_size = chan->mmap_len;
+       chan->handle = ustctl_map_channel(&obj);
+       if (!chan->handle) {
+               return -ENOMEM;
+       }
+       return 0;
+}
+
+void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
+{
+       ustctl_unmap_channel(chan->handle);
+}
+
+int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
+{
+       struct object_data obj;
+       int ret;
+
+       obj.handle = -1;
+       obj.shm_fd = stream->shm_fd;
+       obj.wait_fd = stream->wait_fd;
+       obj.memory_map_size = stream->mmap_len;
+       ret = ustctl_add_stream(stream->chan->handle, &obj);
+       if (ret)
+               return ret;
+       stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
+       if (!stream->buf)
+               return -EBUSY;
+       stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
+       if (!stream->mmap_base) {
+               return -EINVAL;
+       }
+       return 0;
+}
+
+void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
+{
+       ustctl_close_stream_read(stream->chan->handle, stream->buf);
+}
diff --git a/liblttngkconsumerd/Makefile.am b/liblttngkconsumerd/Makefile.am
deleted file mode 100644 (file)
index e8c5741..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-AM_CPPFLAGS = -I$(top_srcdir)/include
-
-lib_LTLIBRARIES = liblttngkconsumerd.la
-
-liblttngkconsumerd_la_SOURCES = lttngkconsumerd.c
-
-liblttngkconsumerd_la_LIBADD = \
-               $(top_builddir)/libkernelctl/libkernelctl.la \
-               $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la
diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c
deleted file mode 100644 (file)
index 1893e0a..0000000
+++ /dev/null
@@ -1,1017 +0,0 @@
-/*
- * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; only version 2
- * of the License.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
- */
-
-#define _GNU_SOURCE
-#include <assert.h>
-#include <fcntl.h>
-#include <poll.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/mman.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <lttng-kernel-ctl.h>
-#include <lttng-sessiond-comm.h>
-#include <lttng/lttng-kconsumerd.h>
-#include <lttngerr.h>
-
-static struct lttng_kconsumerd_global_data {
-       /*
-        * kconsumerd_data.lock protects kconsumerd_data.fd_list,
-        * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It ensures
-        * the count matches the number of items in the fd_list. It ensures the
-        * list updates *always* trigger an fd_array update (therefore need to make
-        * list update vs kconsumerd_data.need_update flag update atomic, and also
-        * flag read, fd array and flag clear atomic).
-        */
-       pthread_mutex_t lock;
-       /*
-        * Number of element for the list below. Protected by kconsumerd_data.lock.
-        */
-       unsigned int fds_count;
-       /*
-        * List of FDs. Protected by kconsumerd_data.lock.
-        */
-       struct lttng_kconsumerd_fd_list fd_list;
-       /*
-        * Flag specifying if the local array of FDs needs update in the poll
-        * function. Protected by kconsumerd_data.lock.
-        */
-       unsigned int need_update;
-} kconsumerd_data = {
-       .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
-       .fds_count = 0,
-       .need_update = 1,
-};
-
-/* timeout parameter, to control the polling thread grace period. */
-static int kconsumerd_poll_timeout = -1;
-
-/*
- * Flag to inform the polling thread to quit when all fd hung up. Updated by
- * the kconsumerd_thread_receive_fds when it notices that all fds has hung up.
- * Also updated by the signal handler (kconsumerd_should_exit()). Read by the
- * polling threads.
- */
-static volatile int kconsumerd_quit = 0;
-
-/*
- * Find a session fd in the global list. The kconsumerd_data.lock must be
- * locked during this call.
- *
- * Return 1 if found else 0.
- */
-static int kconsumerd_find_session_fd(int fd)
-{
-       struct lttng_kconsumerd_fd *iter;
-
-       cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
-               if (iter->sessiond_fd == fd) {
-                       DBG("Duplicate session fd %d", fd);
-                       return 1;
-               }
-       }
-
-       return 0;
-}
-
-/*
- * Remove a fd from the global list protected by a mutex.
- */
-static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf)
-{
-       int ret;
-       pthread_mutex_lock(&kconsumerd_data.lock);
-       cds_list_del(&lcf->list);
-       if (kconsumerd_data.fds_count > 0) {
-               kconsumerd_data.fds_count--;
-               if (lcf != NULL) {
-                       if (lcf->mmap_base != NULL) {
-                               ret = munmap(lcf->mmap_base, lcf->mmap_len);
-                               if (ret != 0) {
-                                       perror("munmap");
-                               }
-                       }
-                       if (lcf->out_fd != 0) {
-                               close(lcf->out_fd);
-                       }
-                       close(lcf->consumerd_fd);
-                       free(lcf);
-                       lcf = NULL;
-               }
-       }
-       kconsumerd_data.need_update = 1;
-       pthread_mutex_unlock(&kconsumerd_data.lock);
-}
-
-/*
- * Create a struct lttcomm_kconsumerd_msg from the
- * information received on the receiving socket
- */
-struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
-               struct lttcomm_kconsumerd_msg *buf,
-               int consumerd_fd)
-{
-       struct lttng_kconsumerd_fd *tmp_fd;
-
-       tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
-       if (tmp_fd == NULL) {
-               perror("malloc struct lttng_kconsumerd_fd");
-               goto end;
-       }
-
-       tmp_fd->sessiond_fd = buf->fd;
-       tmp_fd->consumerd_fd = consumerd_fd;
-       tmp_fd->state = buf->state;
-       tmp_fd->max_sb_size = buf->max_sb_size;
-       tmp_fd->out_fd = 0;
-       tmp_fd->out_fd_offset = 0;
-       tmp_fd->mmap_len = 0;
-       tmp_fd->mmap_base = NULL;
-       tmp_fd->output = buf->output;
-       strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
-       tmp_fd->path_name[PATH_MAX - 1] = '\0';
-       DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
-                       tmp_fd->path_name, tmp_fd->sessiond_fd,
-                       tmp_fd->consumerd_fd, tmp_fd->out_fd);
-
-end:
-       return tmp_fd;
-}
-
-/*
- * Add a fd to the global list protected by a mutex.
- */
-static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
-{
-       int ret;
-
-       pthread_mutex_lock(&kconsumerd_data.lock);
-       /* Check if already exist */
-       ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
-       if (ret == 1) {
-               goto end;
-       }
-       cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
-       kconsumerd_data.fds_count++;
-       kconsumerd_data.need_update = 1;
-
-end:
-       pthread_mutex_unlock(&kconsumerd_data.lock);
-       return ret;
-}
-
-/*
- * Update a fd according to what we just received.
- */
-static void kconsumerd_change_fd_state(int sessiond_fd,
-               enum lttng_kconsumerd_fd_state state)
-{
-       struct lttng_kconsumerd_fd *iter;
-
-       pthread_mutex_lock(&kconsumerd_data.lock);
-       cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
-               if (iter->sessiond_fd == sessiond_fd) {
-                       iter->state = state;
-                       break;
-               }
-       }
-       kconsumerd_data.need_update = 1;
-       pthread_mutex_unlock(&kconsumerd_data.lock);
-}
-
-/*
- * Allocate the pollfd structure and the local view of the out fds to avoid
- * doing a lookup in the linked list and concurrency issues when writing is
- * needed. Called with kconsumerd_data.lock held.
- *
- * Returns the number of fds in the structures.
- */
-static int kconsumerd_update_poll_array(
-               struct lttng_kconsumerd_local_data *ctx, struct pollfd **pollfd,
-               struct lttng_kconsumerd_fd **local_kconsumerd_fd)
-{
-       struct lttng_kconsumerd_fd *iter;
-       int i = 0;
-
-       DBG("Updating poll fd array");
-       cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
-               if (iter->state == ACTIVE_FD) {
-                       DBG("Active FD %d", iter->consumerd_fd);
-                       (*pollfd)[i].fd = iter->consumerd_fd;
-                       (*pollfd)[i].events = POLLIN | POLLPRI;
-                       local_kconsumerd_fd[i] = iter;
-                       i++;
-               }
-       }
-
-       /*
-        * Insert the kconsumerd_poll_pipe at the end of the array and don't
-        * increment i so nb_fd is the number of real FD.
-        */
-       (*pollfd)[i].fd = ctx->kconsumerd_poll_pipe[0];
-       (*pollfd)[i].events = POLLIN;
-       return i;
-}
-
-/*
- * Receives an array of file descriptors and the associated structures
- * describing each fd (path name).
- *
- * Returns the size of received data
- */
-static int kconsumerd_consumerd_recv_fd(
-               struct lttng_kconsumerd_local_data *ctx, int sfd,
-               struct pollfd *kconsumerd_sockpoll, int size,
-               enum lttng_kconsumerd_command cmd_type)
-{
-       struct iovec iov[1];
-       int ret = 0, i, j, tmp2;
-       struct cmsghdr *cmsg;
-       int nb_fd;
-       char recv_fd[CMSG_SPACE(sizeof(int))];
-       struct lttcomm_kconsumerd_msg lkm;
-       struct lttng_kconsumerd_fd *new_fd;
-       union {
-               unsigned char vc[4];
-               int vi;
-       } tmp;
-
-       /* the number of fds we are about to receive */
-       nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
-
-       /*
-        * nb_fd is the number of fds we receive. One fd per recvmsg.
-        */
-       for (i = 0; i < nb_fd; i++) {
-               struct msghdr msg = { 0 };
-
-               /* Prepare to receive the structures */
-               iov[0].iov_base = &lkm;
-               iov[0].iov_len = sizeof(lkm);
-               msg.msg_iov = iov;
-               msg.msg_iovlen = 1;
-
-               msg.msg_control = recv_fd;
-               msg.msg_controllen = sizeof(recv_fd);
-
-               DBG("Waiting to receive fd");
-               if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
-                       goto end;
-               }
-
-               if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
-                       perror("recvmsg");
-                       continue;
-               }
-
-               if (ret != (size / nb_fd)) {
-                       ERR("Received only %d, expected %d", ret, size);
-                       lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
-                       goto end;
-               }
-
-               cmsg = CMSG_FIRSTHDR(&msg);
-               if (!cmsg) {
-                       ERR("Invalid control message header");
-                       ret = -1;
-                       lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
-                       goto end;
-               }
-
-               /* if we received fds */
-               if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
-                       switch (cmd_type) {
-                               case ADD_STREAM:
-                                       for (j = 0; j < sizeof(int); j++)
-                                               tmp.vc[j] = CMSG_DATA(cmsg)[j];
-                                       DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, tmp.vi);
-                                       new_fd = kconsumerd_allocate_fd(&lkm, tmp.vi);
-                                       if (new_fd == NULL) {
-                                               lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
-                                               goto end;
-                                       }
-
-                                       if (ctx->on_recv_fd != NULL) {
-                                               ret = ctx->on_recv_fd(new_fd);
-                                               if (ret == 0) {
-                                                       kconsumerd_add_fd(new_fd);
-                                               } else if (ret < 0) {
-                                                       goto end;
-                                               }
-                                       } else {
-                                               kconsumerd_add_fd(new_fd);
-                                       }
-                                       break;
-                               case UPDATE_STREAM:
-                                       if (ctx->on_update_fd != NULL) {
-                                               ret = ctx->on_update_fd(lkm.fd, lkm.state);
-                                               if (ret == 0) {
-                                                       kconsumerd_change_fd_state(lkm.fd, lkm.state);
-                                               } else if (ret < 0) {
-                                                       goto end;
-                                               }
-                                       } else {
-                                               kconsumerd_change_fd_state(lkm.fd, lkm.state);
-                                       }
-                                       break;
-                               default:
-                                       break;
-                       }
-                       /* signal the poll thread */
-                       tmp2 = write(ctx->kconsumerd_poll_pipe[1], "4", 1);
-                       if (tmp2 < 0) {
-                               perror("write kconsumerd poll");
-                       }
-               } else {
-                       ERR("Didn't received any fd");
-                       lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
-                       ret = -1;
-                       goto end;
-               }
-       }
-
-end:
-       return ret;
-}
-
-/*
- * Set the error socket.
- */
-void lttng_kconsumerd_set_error_sock(
-               struct lttng_kconsumerd_local_data *ctx, int sock)
-{
-       ctx->kconsumerd_error_socket = sock;
-}
-
-/*
- * Set the command socket path.
- */
-
-void lttng_kconsumerd_set_command_sock_path(
-               struct lttng_kconsumerd_local_data *ctx, char *sock)
-{
-       ctx->kconsumerd_command_sock_path = sock;
-}
-
-static void lttng_kconsumerd_sync_trace_file(
-               struct lttng_kconsumerd_fd *kconsumerd_fd, off_t orig_offset)
-{
-       int outfd = kconsumerd_fd->out_fd;
-       /*
-        * This does a blocking write-and-wait on any page that belongs to the
-        * subbuffer prior to the one we just wrote.
-        * Don't care about error values, as these are just hints and ways to
-        * limit the amount of page cache used.
-        */
-       if (orig_offset >= kconsumerd_fd->max_sb_size) {
-               sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
-                               kconsumerd_fd->max_sb_size,
-                               SYNC_FILE_RANGE_WAIT_BEFORE
-                               | SYNC_FILE_RANGE_WRITE
-                               | SYNC_FILE_RANGE_WAIT_AFTER);
-               /*
-                * Give hints to the kernel about how we access the file:
-                * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-                * we write it.
-                *
-                * We need to call fadvise again after the file grows because the
-                * kernel does not seem to apply fadvise to non-existing parts of the
-                * file.
-                *
-                * Call fadvise _after_ having waited for the page writeback to
-                * complete because the dirty page writeback semantic is not well
-                * defined. So it can be expected to lead to lower throughput in
-                * streaming.
-                */
-               posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
-                               kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
-       }
-}
-
-
-/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written
- */
-int lttng_kconsumerd_on_read_subbuffer_mmap(
-               struct lttng_kconsumerd_local_data *ctx,
-               struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len)
-{
-       unsigned long mmap_offset;
-       long ret = 0;
-       off_t orig_offset = kconsumerd_fd->out_fd_offset;
-       int fd = kconsumerd_fd->consumerd_fd;
-       int outfd = kconsumerd_fd->out_fd;
-
-       /* get the offset inside the fd to mmap */
-       ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
-       if (ret != 0) {
-               ret = errno;
-               perror("kernctl_get_mmap_read_offset");
-               goto end;
-       }
-
-       while (len > 0) {
-               ret = write(outfd, kconsumerd_fd->mmap_base + mmap_offset, len);
-               if (ret >= len) {
-                       len = 0;
-               } else if (ret < 0) {
-                       ret = errno;
-                       perror("Error in file write");
-                       goto end;
-               }
-               /* This won't block, but will start writeout asynchronously */
-               sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
-                               SYNC_FILE_RANGE_WRITE);
-               kconsumerd_fd->out_fd_offset += ret;
-       }
-
-       lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset);
-
-       goto end;
-
-end:
-       return ret;
-}
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced.
- */
-int lttng_kconsumerd_on_read_subbuffer_splice(
-               struct lttng_kconsumerd_local_data *ctx,
-               struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len)
-{
-       long ret = 0;
-       loff_t offset = 0;
-       off_t orig_offset = kconsumerd_fd->out_fd_offset;
-       int fd = kconsumerd_fd->consumerd_fd;
-       int outfd = kconsumerd_fd->out_fd;
-
-       while (len > 0) {
-               DBG("splice chan to pipe offset %lu (fd : %d)",
-                               (unsigned long)offset, fd);
-               ret = splice(fd, &offset, ctx->kconsumerd_thread_pipe[1], NULL, len,
-                               SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("splice chan to pipe ret %ld", ret);
-               if (ret < 0) {
-                       ret = errno;
-                       perror("Error in relay splice");
-                       goto splice_error;
-               }
-
-               ret = splice(ctx->kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
-                               SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("splice pipe to file %ld", ret);
-               if (ret < 0) {
-                       ret = errno;
-                       perror("Error in file splice");
-                       goto splice_error;
-               }
-               len -= ret;
-               /* This won't block, but will start writeout asynchronously */
-               sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
-                               SYNC_FILE_RANGE_WRITE);
-               kconsumerd_fd->out_fd_offset += ret;
-       }
-       lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset);
-
-       goto end;
-
-splice_error:
-       /* send the appropriate error description to sessiond */
-       switch(ret) {
-       case EBADF:
-               lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EBADF);
-               break;
-       case EINVAL:
-               lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EINVAL);
-               break;
-       case ENOMEM:
-               lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ENOMEM);
-               break;
-       case ESPIPE:
-               lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ESPIPE);
-               break;
-       }
-
-end:
-       return ret;
-}
-
-/*
- * Take a snapshot for a specific fd
- *
- * Returns 0 on success, < 0 on error
- */
-int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx,
-               struct lttng_kconsumerd_fd *kconsumerd_fd)
-{
-       int ret = 0;
-       int infd = kconsumerd_fd->consumerd_fd;
-
-       ret = kernctl_snapshot(infd);
-       if (ret != 0) {
-               ret = errno;
-               perror("Getting sub-buffer snapshot.");
-       }
-
-       return ret;
-}
-
-/*
- * Get the produced position
- *
- * Returns 0 on success, < 0 on error
- */
-int lttng_kconsumerd_get_produced_snapshot(
-               struct lttng_kconsumerd_local_data *ctx,
-               struct lttng_kconsumerd_fd *kconsumerd_fd,
-               unsigned long *pos)
-{
-       int ret;
-       int infd = kconsumerd_fd->consumerd_fd;
-
-       ret = kernctl_snapshot_get_produced(infd, pos);
-       if (ret != 0) {
-               ret = errno;
-               perror("kernctl_snapshot_get_produced");
-       }
-
-       return ret;
-}
-
-/*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
- */
-int lttng_kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
-{
-       int num_rdy;
-
-       num_rdy = poll(kconsumerd_sockpoll, 2, -1);
-       if (num_rdy == -1) {
-               perror("Poll error");
-               goto exit;
-       }
-       if (kconsumerd_sockpoll[0].revents == POLLIN) {
-               DBG("kconsumerd_should_quit wake up");
-               goto exit;
-       }
-       return 0;
-
-exit:
-       return -1;
-}
-
-/*
- * This thread polls the fds in the ltt_fd_list to consume the data and write
- * it to tracefile if necessary.
- */
-void *lttng_kconsumerd_thread_poll_fds(void *data)
-{
-       int num_rdy, num_hup, high_prio, ret, i;
-       struct pollfd *pollfd = NULL;
-       /* local view of the fds */
-       struct lttng_kconsumerd_fd **local_kconsumerd_fd = NULL;
-       /* local view of kconsumerd_data.fds_count */
-       int nb_fd = 0;
-       char tmp;
-       int tmp2;
-       struct lttng_kconsumerd_local_data *ctx = data;
-
-
-       local_kconsumerd_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
-
-       while (1) {
-               high_prio = 0;
-               num_hup = 0;
-
-               /*
-                * the ltt_fd_list has been updated, we need to update our
-                * local array as well
-                */
-               pthread_mutex_lock(&kconsumerd_data.lock);
-               if (kconsumerd_data.need_update) {
-                       if (pollfd != NULL) {
-                               free(pollfd);
-                               pollfd = NULL;
-                       }
-                       if (local_kconsumerd_fd != NULL) {
-                               free(local_kconsumerd_fd);
-                               local_kconsumerd_fd = NULL;
-                       }
-
-                       /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
-                       pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
-                       if (pollfd == NULL) {
-                               perror("pollfd malloc");
-                               pthread_mutex_unlock(&kconsumerd_data.lock);
-                               goto end;
-                       }
-
-                       /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
-                       local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
-                                       sizeof(struct lttng_kconsumerd_fd));
-                       if (local_kconsumerd_fd == NULL) {
-                               perror("local_kconsumerd_fd malloc");
-                               pthread_mutex_unlock(&kconsumerd_data.lock);
-                               goto end;
-                       }
-                       ret = kconsumerd_update_poll_array(ctx, &pollfd, local_kconsumerd_fd);
-                       if (ret < 0) {
-                               ERR("Error in allocating pollfd or local_outfds");
-                               lttng_kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR);
-                               pthread_mutex_unlock(&kconsumerd_data.lock);
-                               goto end;
-                       }
-                       nb_fd = ret;
-                       kconsumerd_data.need_update = 0;
-               }
-               pthread_mutex_unlock(&kconsumerd_data.lock);
-
-               /* poll on the array of fds */
-               DBG("polling on %d fd", nb_fd + 1);
-               num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
-               DBG("poll num_rdy : %d", num_rdy);
-               if (num_rdy == -1) {
-                       perror("Poll error");
-                       lttng_kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR);
-                       goto end;
-               } else if (num_rdy == 0) {
-                       DBG("Polling thread timed out");
-                       goto end;
-               }
-
-               /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
-               if (nb_fd == 0 && kconsumerd_quit == 1) {
-                       goto end;
-               }
-
-               /*
-                * If the kconsumerd_poll_pipe triggered poll go
-                * directly to the beginning of the loop to update the
-                * array. We want to prioritize array update over
-                * low-priority reads.
-                */
-               if (pollfd[nb_fd].revents == POLLIN) {
-                       DBG("kconsumerd_poll_pipe wake up");
-                       tmp2 = read(ctx->kconsumerd_poll_pipe[0], &tmp, 1);
-                       if (tmp2 < 0) {
-                               perror("read kconsumerd poll");
-                       }
-                       continue;
-               }
-
-               /* Take care of high priority channels first. */
-               for (i = 0; i < nb_fd; i++) {
-                       switch(pollfd[i].revents) {
-                       case POLLERR:
-                               ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               kconsumerd_del_fd(local_kconsumerd_fd[i]);
-                               num_hup++;
-                               break;
-                       case POLLHUP:
-                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               kconsumerd_del_fd(local_kconsumerd_fd[i]);
-                               num_hup++;
-                               break;
-                       case POLLNVAL:
-                               ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               kconsumerd_del_fd(local_kconsumerd_fd[i]);
-                               num_hup++;
-                               break;
-                       case POLLPRI:
-                               DBG("Urgent read on fd %d", pollfd[i].fd);
-                               high_prio = 1;
-                               ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]);
-                               /* it's ok to have an unavailable sub-buffer */
-                               if (ret == EAGAIN) {
-                                       ret = 0;
-                               }
-                               break;
-                       }
-               }
-
-               /* If every buffer FD has hung up, we end the read loop here */
-               if (nb_fd > 0 && num_hup == nb_fd) {
-                       DBG("every buffer FD has hung up\n");
-                       if (kconsumerd_quit == 1) {
-                               goto end;
-                       }
-                       continue;
-               }
-
-               /* Take care of low priority channels. */
-               if (high_prio == 0) {
-                       for (i = 0; i < nb_fd; i++) {
-                               if (pollfd[i].revents == POLLIN) {
-                                       DBG("Normal read on fd %d", pollfd[i].fd);
-                                       ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]);
-                                       /* it's ok to have an unavailable subbuffer */
-                                       if (ret == EAGAIN) {
-                                               ret = 0;
-                                       }
-                               }
-                       }
-               }
-       }
-end:
-       DBG("polling thread exiting");
-       if (pollfd != NULL) {
-               free(pollfd);
-               pollfd = NULL;
-       }
-       if (local_kconsumerd_fd != NULL) {
-               free(local_kconsumerd_fd);
-               local_kconsumerd_fd = NULL;
-       }
-       return NULL;
-}
-
-/*
- * Initialise the necessary environnement :
- * - create a new context
- * - create the poll_pipe
- * - create the should_quit pipe (for signal handler)
- * - create the thread pipe (for splice)
- *
- * Takes a function pointer as argument, this function is called when data is
- * available on a buffer. This function is responsible to do the
- * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
- * buffer configuration and then kernctl_put_next_subbuf at the end.
- *
- * Returns a pointer to the new context or NULL on error.
- */
-struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
-               int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
-               int (*update_fd)(int sessiond_fd, uint32_t state))
-{
-       int ret, i;
-       struct lttng_kconsumerd_local_data *ctx;
-
-       ctx = malloc(sizeof(struct lttng_kconsumerd_local_data));
-       if (ctx == NULL) {
-               perror("allocating context");
-               goto error;
-       }
-
-       ctx->kconsumerd_error_socket = -1;
-       /* assign the callbacks */
-       ctx->on_buffer_ready = buffer_ready;
-       ctx->on_recv_fd = recv_fd;
-       ctx->on_update_fd = update_fd;
-
-       ret = pipe(ctx->kconsumerd_poll_pipe);
-       if (ret < 0) {
-               perror("Error creating poll pipe");
-               goto error_poll_pipe;
-       }
-
-       ret = pipe(ctx->kconsumerd_should_quit);
-       if (ret < 0) {
-               perror("Error creating recv pipe");
-               goto error_quit_pipe;
-       }
-
-       ret = pipe(ctx->kconsumerd_thread_pipe);
-       if (ret < 0) {
-               perror("Error creating thread pipe");
-               goto error_thread_pipe;
-       }
-
-       return ctx;
-
-
-error_thread_pipe:
-       for (i = 0; i < 2; i++) {
-               int err;
-
-               err = close(ctx->kconsumerd_should_quit[i]);
-               assert(!err);
-       }
-error_quit_pipe:
-       for (i = 0; i < 2; i++) {
-               int err;
-
-               err = close(ctx->kconsumerd_poll_pipe[i]);
-               assert(!err);
-       }
-error_poll_pipe:
-       free(ctx);
-error:
-       return NULL;
-}
-
-/*
- * Close all fds associated with the instance and free the context.
- */
-void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data *ctx)
-{
-       close(ctx->kconsumerd_error_socket);
-       close(ctx->kconsumerd_thread_pipe[0]);
-       close(ctx->kconsumerd_thread_pipe[1]);
-       close(ctx->kconsumerd_poll_pipe[0]);
-       close(ctx->kconsumerd_poll_pipe[1]);
-       close(ctx->kconsumerd_should_quit[0]);
-       close(ctx->kconsumerd_should_quit[1]);
-       unlink(ctx->kconsumerd_command_sock_path);
-       free(ctx);
-       ctx = NULL;
-}
-
-/*
- * This thread listens on the consumerd socket and receives the file
- * descriptors from the session daemon.
- */
-void *lttng_kconsumerd_thread_receive_fds(void *data)
-{
-       int sock, client_socket, ret;
-       struct lttcomm_kconsumerd_header tmp;
-       /*
-        * structure to poll for incoming data on communication socket avoids
-        * making blocking sockets.
-        */
-       struct pollfd kconsumerd_sockpoll[2];
-       struct lttng_kconsumerd_local_data *ctx = data;
-
-
-       DBG("Creating command socket %s", ctx->kconsumerd_command_sock_path);
-       unlink(ctx->kconsumerd_command_sock_path);
-       client_socket = lttcomm_create_unix_sock(ctx->kconsumerd_command_sock_path);
-       if (client_socket < 0) {
-               ERR("Cannot create command socket");
-               goto end;
-       }
-
-       ret = lttcomm_listen_unix_sock(client_socket);
-       if (ret < 0) {
-               goto end;
-       }
-
-       DBG("Sending ready command to ltt-sessiond");
-       ret = lttng_kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY);
-       /* return < 0 on error, but == 0 is not fatal */
-       if (ret < 0) {
-               ERR("Error sending ready command to ltt-sessiond");
-               goto end;
-       }
-
-       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
-               goto end;
-       }
-
-       /* prepare the FDs to poll : to client socket and the should_quit pipe */
-       kconsumerd_sockpoll[0].fd = ctx->kconsumerd_should_quit[0];
-       kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
-       kconsumerd_sockpoll[1].fd = client_socket;
-       kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
-
-       if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
-               goto end;
-       }
-       DBG("Connection on client_socket");
-
-       /* Blocking call, waiting for transmission */
-       sock = lttcomm_accept_unix_sock(client_socket);
-       if (sock <= 0) {
-               WARN("On accept");
-               goto end;
-       }
-       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
-               goto end;
-       }
-
-       /* update the polling structure to poll on the established socket */
-       kconsumerd_sockpoll[1].fd = sock;
-       kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
-
-       while (1) {
-               if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
-                       goto end;
-               }
-               DBG("Incoming fds on sock");
-
-               /* We first get the number of fd we are about to receive */
-               ret = lttcomm_recv_unix_sock(sock, &tmp,
-                               sizeof(struct lttcomm_kconsumerd_header));
-               if (ret <= 0) {
-                       ERR("Communication interrupted on command socket");
-                       goto end;
-               }
-               if (tmp.cmd_type == STOP) {
-                       DBG("Received STOP command");
-                       goto end;
-               }
-               if (kconsumerd_quit) {
-                       DBG("kconsumerd_thread_receive_fds received quit from signal");
-                       goto end;
-               }
-
-               /* we received a command to add or update fds */
-               ret = kconsumerd_consumerd_recv_fd(ctx, sock, kconsumerd_sockpoll,
-                               tmp.payload_size, tmp.cmd_type);
-               if (ret < 0) {
-                       ERR("Receiving the FD, exiting");
-                       goto end;
-               }
-               DBG("received fds on sock");
-       }
-
-end:
-       DBG("kconsumerd_thread_receive_fds exiting");
-
-       /*
-        * when all fds have hung up, the polling thread
-        * can exit cleanly
-        */
-       kconsumerd_quit = 1;
-
-       /*
-        * 2s of grace period, if no polling events occur during
-        * this period, the polling thread will exit even if there
-        * are still open FDs (should not happen, but safety mechanism).
-        */
-       kconsumerd_poll_timeout = LTTNG_KCONSUMERD_POLL_GRACE_PERIOD;
-
-       /* wake up the polling thread */
-       ret = write(ctx->kconsumerd_poll_pipe[1], "4", 1);
-       if (ret < 0) {
-               perror("poll pipe write");
-       }
-       return NULL;
-}
-
-/*
- * Close all the tracefiles and stream fds, should be called when all instances
- * are destroyed.
- */
-void lttng_kconsumerd_cleanup(void)
-{
-       struct lttng_kconsumerd_fd *iter, *tmp;
-
-       /*
-        * close all outfd. Called when there are no more threads
-        * running (after joining on the threads), no need to protect
-        * list iteration with mutex.
-        */
-       cds_list_for_each_entry_safe(iter, tmp,
-                       &kconsumerd_data.fd_list.head, list) {
-               kconsumerd_del_fd(iter);
-       }
-}
-
-/*
- * Called from signal handler.
- */
-void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data *ctx)
-{
-       int ret;
-       kconsumerd_quit = 1;
-       ret = write(ctx->kconsumerd_should_quit[1], "4", 1);
-       if (ret < 0) {
-               perror("write kconsumerd quit");
-       }
-}
-
-/*
- * Send return code to the session daemon.
- * If the socket is not defined, we return 0, it is not a fatal error
- */
-int lttng_kconsumerd_send_error(
-               struct lttng_kconsumerd_local_data *ctx, int cmd)
-{
-       if (ctx->kconsumerd_error_socket > 0) {
-               return lttcomm_send_unix_sock(ctx->kconsumerd_error_socket, &cmd,
-                               sizeof(enum lttcomm_sessiond_command));
-       }
-
-       return 0;
-}
diff --git a/libustctl/Makefile.am b/libustctl/Makefile.am
deleted file mode 100644 (file)
index c20e966..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/libustcomm
-AM_CFLAGS = -fno-strict-aliasing
-
-noinst_LTLIBRARIES = libustctl.la
-
-libustctl_la_SOURCES =
-
-libustctl_la_LIBADD = $(top_builddir)/libustcomm/libustcomm.la
-
diff --git a/ltt-kconsumerd/Makefile.am b/ltt-kconsumerd/Makefile.am
deleted file mode 100644 (file)
index 068c555..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-AM_CPPFLAGS = -I$(top_srcdir)/include
-
-bin_PROGRAMS = ltt-kconsumerd
-
-ltt_kconsumerd_SOURCES = ltt-kconsumerd.c
-
-ltt_kconsumerd_LDADD = \
-          $(top_builddir)/libkernelctl/libkernelctl.la \
-          $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \
-          $(top_builddir)/liblttngkconsumerd/liblttngkconsumerd.la
diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
deleted file mode 100644 (file)
index a81be55..0000000
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; only version 2
- * of the License.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
- */
-
-#define _GNU_SOURCE
-#include <fcntl.h>
-#include <getopt.h>
-#include <grp.h>
-#include <limits.h>
-#include <pthread.h>
-#include <signal.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/ipc.h>
-#include <sys/shm.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <urcu/list.h>
-#include <poll.h>
-#include <unistd.h>
-#include <sys/mman.h>
-#include <assert.h>
-
-#include <ltt-kconsumerd.h>
-#include <lttng-kernel-ctl.h>
-#include <lttng-sessiond-comm.h>
-#include <lttng/lttng-kconsumerd.h>
-#include <lttngerr.h>
-
-/* the two threads (receive fd and poll) */
-static pthread_t threads[2];
-
-/* to count the number of time the user pressed ctrl+c */
-static int sigintcount = 0;
-
-/* Argument variables */
-int opt_quiet;
-int opt_verbose;
-static int opt_daemon;
-static const char *progname;
-static char command_sock_path[PATH_MAX]; /* Global command socket path */
-static char error_sock_path[PATH_MAX]; /* Global error path */
-
-/* the liblttngkconsumerd context */
-static struct lttng_kconsumerd_local_data *ctx;
-
-/*
- * Signal handler for the daemon
- */
-static void sighandler(int sig)
-{
-       if (sig == SIGINT && sigintcount++ == 0) {
-               DBG("ignoring first SIGINT");
-               return;
-       }
-
-       lttng_kconsumerd_should_exit(ctx);
-}
-
-/*
- * Setup signal handler for :
- *      SIGINT, SIGTERM, SIGPIPE
- */
-static int set_signal_handler(void)
-{
-       int ret = 0;
-       struct sigaction sa;
-       sigset_t sigset;
-
-       if ((ret = sigemptyset(&sigset)) < 0) {
-               perror("sigemptyset");
-               return ret;
-       }
-
-       sa.sa_handler = sighandler;
-       sa.sa_mask = sigset;
-       sa.sa_flags = 0;
-       if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
-               perror("sigaction");
-               return ret;
-       }
-
-       if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
-               perror("sigaction");
-               return ret;
-       }
-
-       if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
-               perror("sigaction");
-               return ret;
-       }
-
-       return ret;
-}
-
-/*
- * usage function on stderr
- */
-static void usage(void)
-{
-       fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
-       fprintf(stderr, "  -h, --help                         "
-                       "Display this usage.\n");
-       fprintf(stderr, "  -c, --kconsumerd-cmd-sock PATH     "
-                       "Specify path for the command socket\n");
-       fprintf(stderr, "  -e, --kconsumerd-err-sock PATH     "
-                       "Specify path for the error socket\n");
-       fprintf(stderr, "  -d, --daemonize                    "
-                       "Start as a daemon.\n");
-       fprintf(stderr, "  -q, --quiet                        "
-                       "No output at all.\n");
-       fprintf(stderr, "  -v, --verbose                      "
-                       "Verbose mode. Activate DBG() macro.\n");
-       fprintf(stderr, "  -V, --version                      "
-                       "Show version number.\n");
-}
-
-/*
- * daemon argument parsing
- */
-static void parse_args(int argc, char **argv)
-{
-       int c;
-
-       static struct option long_options[] = {
-               { "kconsumerd-cmd-sock", 1, 0, 'c' },
-               { "kconsumerd-err-sock", 1, 0, 'e' },
-               { "daemonize", 0, 0, 'd' },
-               { "help", 0, 0, 'h' },
-               { "quiet", 0, 0, 'q' },
-               { "verbose", 0, 0, 'v' },
-               { "version", 0, 0, 'V' },
-               { NULL, 0, 0, 0 }
-       };
-
-       while (1) {
-               int option_index = 0;
-               c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index);
-               if (c == -1) {
-                       break;
-               }
-
-               switch (c) {
-               case 0:
-                       fprintf(stderr, "option %s", long_options[option_index].name);
-                       if (optarg) {
-                               fprintf(stderr, " with arg %s\n", optarg);
-                       }
-                       break;
-               case 'c':
-                       snprintf(command_sock_path, PATH_MAX, "%s", optarg);
-                       break;
-               case 'e':
-                       snprintf(error_sock_path, PATH_MAX, "%s", optarg);
-                       break;
-               case 'd':
-                       opt_daemon = 1;
-                       break;
-               case 'h':
-                       usage();
-                       exit(EXIT_FAILURE);
-               case 'q':
-                       opt_quiet = 1;
-                       break;
-               case 'v':
-                       opt_verbose = 1;
-                       break;
-               case 'V':
-                       fprintf(stdout, "%s\n", VERSION);
-                       exit(EXIT_SUCCESS);
-               default:
-                       usage();
-                       exit(EXIT_FAILURE);
-               }
-       }
-}
-
-/*
- * Consume data on a file descriptor and write it on a trace file.
- */
-static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd)
-{
-       unsigned long len;
-       int err;
-       long ret = 0;
-       int infd = kconsumerd_fd->consumerd_fd;
-
-       DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
-       /* Get the next subbuffer */
-       err = kernctl_get_next_subbuf(infd);
-       if (err != 0) {
-               ret = errno;
-               /*
-                * This is a debug message even for single-threaded consumer,
-                * because poll() have more relaxed criterions than get subbuf,
-                * so get_subbuf may fail for short race windows where poll()
-                * would issue wakeups.
-                */
-               DBG("Reserving sub buffer failed (everything is normal, "
-                               "it is due to concurrency)");
-               goto end;
-       }
-
-       switch (kconsumerd_fd->output) {
-               case LTTNG_EVENT_SPLICE:
-                       /* read the whole subbuffer */
-                       err = kernctl_get_padded_subbuf_size(infd, &len);
-                       if (err != 0) {
-                               ret = errno;
-                               perror("Getting sub-buffer len failed.");
-                               goto end;
-                       }
-
-                       /* splice the subbuffer to the tracefile */
-                       ret = lttng_kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len);
-                       if (ret < 0) {
-                               /*
-                                * display the error but continue processing to try
-                                * to release the subbuffer
-                                */
-                               ERR("Error splicing to tracefile");
-                       }
-                       break;
-               case LTTNG_EVENT_MMAP:
-                       /* read the used subbuffer size */
-                       err = kernctl_get_padded_subbuf_size(infd, &len);
-                       if (err != 0) {
-                               ret = errno;
-                               perror("Getting sub-buffer len failed.");
-                               goto end;
-                       }
-                       /* write the subbuffer to the tracefile */
-                       ret = lttng_kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len);
-                       if (ret < 0) {
-                               /*
-                                * display the error but continue processing to try
-                                * to release the subbuffer
-                                */
-                               ERR("Error writing to tracefile");
-                       }
-                       break;
-               default:
-                       ERR("Unknown output method");
-                       ret = -1;
-       }
-
-       err = kernctl_put_next_subbuf(infd);
-       if (err != 0) {
-               ret = errno;
-               if (errno == EFAULT) {
-                       perror("Error in unreserving sub buffer\n");
-               } else if (errno == EIO) {
-                       /* Should never happen with newer LTTng versions */
-                       perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
-               }
-               goto end;
-       }
-
-end:
-       return ret;
-}
-
-static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
-{
-       int ret;
-
-       /* Opening the tracefile in write mode */
-       if (kconsumerd_fd->path_name != NULL) {
-               ret = open(kconsumerd_fd->path_name,
-                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
-               if (ret < 0) {
-                       ERR("Opening %s", kconsumerd_fd->path_name);
-                       perror("open");
-                       goto error;
-               }
-               kconsumerd_fd->out_fd = ret;
-       }
-
-       if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
-               /* get the len of the mmap region */
-               unsigned long mmap_len;
-
-               ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &mmap_len);
-               if (ret != 0) {
-                       ret = errno;
-                       perror("kernctl_get_mmap_len");
-                       goto error_close_fd;
-               }
-               kconsumerd_fd->mmap_len = (size_t) mmap_len;
-
-               kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
-                               PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0);
-               if (kconsumerd_fd->mmap_base == MAP_FAILED) {
-                       perror("Error mmaping");
-                       ret = -1;
-                       goto error_close_fd;
-               }
-       }
-
-       /* we return 0 to let the library handle the FD internally */
-       return 0;
-
-error_close_fd:
-       {
-               int err;
-
-               err = close(kconsumerd_fd->out_fd);
-               assert(!err);
-       }
-error:
-       return ret;
-}
-
-/*
- * main
- */
-int main(int argc, char **argv)
-{
-       int i;
-       int ret = 0;
-       void *status;
-
-       /* Parse arguments */
-       progname = argv[0];
-       parse_args(argc, argv);
-
-       /* Daemonize */
-       if (opt_daemon) {
-               ret = daemon(0, 0);
-               if (ret < 0) {
-                       perror("daemon");
-                       goto error;
-               }
-       }
-
-       if (strlen(command_sock_path) == 0) {
-               snprintf(command_sock_path, PATH_MAX,
-                               KCONSUMERD_CMD_SOCK_PATH);
-       }
-       /* create the consumer instance with and assign the callbacks */
-       ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
-       if (ctx == NULL) {
-               goto error;
-       }
-
-       lttng_kconsumerd_set_command_sock_path(ctx, command_sock_path);
-       if (strlen(error_sock_path) == 0) {
-               snprintf(error_sock_path, PATH_MAX,
-                               KCONSUMERD_ERR_SOCK_PATH);
-       }
-
-       if (set_signal_handler() < 0) {
-               goto error;
-       }
-
-       /* Connect to the socket created by ltt-sessiond to report errors */
-       DBG("Connecting to error socket %s", error_sock_path);
-       ret = lttcomm_connect_unix_sock(error_sock_path);
-       /* not a fatal error, but all communication with ltt-sessiond will fail */
-       if (ret < 0) {
-               WARN("Cannot connect to error socket, is ltt-sessiond started ?");
-       }
-       lttng_kconsumerd_set_error_sock(ctx, ret);
-
-       /* Create the thread to manage the receive of fd */
-       ret = pthread_create(&threads[0], NULL, lttng_kconsumerd_thread_receive_fds,
-                       (void *) ctx);
-       if (ret != 0) {
-               perror("pthread_create");
-               goto error;
-       }
-
-       /* Create thread to manage the polling/writing of traces */
-       ret = pthread_create(&threads[1], NULL, lttng_kconsumerd_thread_poll_fds,
-                       (void *) ctx);
-       if (ret != 0) {
-               perror("pthread_create");
-               goto error;
-       }
-
-       for (i = 0; i < 2; i++) {
-               ret = pthread_join(threads[i], &status);
-               if (ret != 0) {
-                       perror("pthread_join");
-                       goto error;
-               }
-       }
-       ret = EXIT_SUCCESS;
-       lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS);
-       goto end;
-
-error:
-       ret = EXIT_FAILURE;
-       lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE);
-
-end:
-       lttng_kconsumerd_destroy(ctx);
-       lttng_kconsumerd_cleanup();
-
-       return ret;
-}
index cdcdfc60e83dad3430cfc85375d711432265aa5f..0d284efb1ee4cc27f6155a24cdf29a7c8d3ca9d4 100644 (file)
@@ -14,11 +14,8 @@ endif
 ltt_sessiond_SOURCES = utils.c utils.h \
                        compat/poll.h $(COMPAT) \
                        trace-kernel.c trace-kernel.h \
-                       trace-ust.c trace-ust.h \
-                       ust-app.c ust-app.h \
-                       ust-comm.c ust-comm.h \
-                       ust-ctl.c ust-ctl.h \
                        kernel-ctl.c kernel-ctl.h \
+                      ust-ctl.h ust-app.h trace-ust.h \
                        context.c context.h \
                        channel.c channel.h \
                        event.c event.h \
@@ -29,9 +26,16 @@ ltt_sessiond_SOURCES = utils.c utils.h \
                        ../hashtable/rculfhash.c \
                        ../hashtable/rculfhash.h
 
+if LTTNG_TOOLS_HAVE_UST
+ltt_sessiond_SOURCES += \
+                       trace-ust.c \
+                       ust-app.c \
+                       ust-comm.c ust-comm.h \
+                       ust-ctl.c
+endif
+
 # link on liblttngctl for check if sessiond is already alive.
 ltt_sessiond_LDADD = -lrt \
                 $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \
                 $(top_builddir)/libkernelctl/libkernelctl.la \
-                $(top_builddir)/libustctl/libustctl.la \
                 $(top_builddir)/liblttngctl/liblttngctl.la
index 431c5eb0d5ecd8a5f2980db4fbe1a3440b74c5fa..e909b603c6bdb75fd47869548c9e1d70ec8696e3 100644 (file)
@@ -21,6 +21,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <config.h>
 
 #include <lttngerr.h>
 
index 9c270edb3325674be28ed3886c646b8267ba0022..203c01050e82199f34128d09b9c5dd95ca6fb0a4 100644 (file)
@@ -122,7 +122,7 @@ int kernel_create_session(struct ltt_session *session, int tracer_fd)
                perror("fcntl session fd");
        }
 
-       lks->kconsumer_fds_sent = 0;
+       lks->consumer_fds_sent = 0;
        session->kernel_session = lks;
 
        DBG("Kernel session created (fd: %d)", lks->fd);
diff --git a/ltt-sessiond/lttng-ust-abi.h b/ltt-sessiond/lttng-ust-abi.h
new file mode 100644 (file)
index 0000000..303493e
--- /dev/null
@@ -0,0 +1,137 @@
+#ifndef _LTTNG_UST_ABI_H
+#define _LTTNG_UST_ABI_H
+
+/*
+ * lttng-ust-abi.h
+ *
+ * Copyright 2010-2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * LTTng-UST ABI header
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <stdint.h>
+
+#define LTTNG_UST_SYM_NAME_LEN 128
+
+#define LTTNG_UST_COMM_VERSION_MAJOR           0
+#define LTTNG_UST_COMM_VERSION_MINOR           1
+
+enum lttng_ust_instrumentation {
+       LTTNG_UST_TRACEPOINT    = 0,
+       LTTNG_UST_PROBE         = 1,
+       LTTNG_UST_FUNCTION      = 2,
+};
+
+enum lttng_ust_output {
+       LTTNG_UST_MMAP          = 0,
+};
+
+struct lttng_ust_tracer_version {
+       uint32_t version;
+       uint32_t patchlevel;
+       uint32_t sublevel;
+};
+
+struct lttng_ust_channel {
+       int overwrite;                          /* 1: overwrite, 0: discard */
+       uint64_t subbuf_size;                   /* in bytes */
+       uint64_t num_subbuf;
+       unsigned int switch_timer_interval;     /* usecs */
+       unsigned int read_timer_interval;       /* usecs */
+       enum lttng_ust_output output;           /* output mode */
+       /* The following fields are used internally within UST. */
+       int shm_fd;
+       int wait_fd;
+       uint64_t memory_map_size;
+};
+
+/*
+ * This structure is only used internally within UST. It is not per-se
+ * part of the communication between sessiond and UST.
+ */
+struct lttng_ust_stream {
+       int shm_fd;
+       int wait_fd;
+       uint64_t memory_map_size;
+};
+
+struct lttng_ust_event {
+       char name[LTTNG_UST_SYM_NAME_LEN];      /* event name */
+       enum lttng_ust_instrumentation instrumentation;
+       /* Per instrumentation type configuration */
+       union {
+       } u;
+};
+
+enum lttng_ust_context_type {
+       LTTNG_UST_CONTEXT_VTID                  = 0,
+       LTTNG_UST_CONTEXT_VPID                  = 1,
+       LTTNG_UST_CONTEXT_PTHREAD_ID            = 2,
+       LTTNG_UST_CONTEXT_PROCNAME              = 3,
+};
+
+struct lttng_ust_context {
+       enum lttng_ust_context_type ctx;
+       union {
+       } u;
+};
+
+#define _UST_CMD(minor)                                (minor)
+#define _UST_CMDR(minor, type)                 (minor)
+#define _UST_CMDW(minor, type)                 (minor)
+
+/* Handled by object descriptor */
+#define LTTNG_UST_RELEASE                      _UST_CMD(0x1)
+
+/* Handled by object cmd */
+
+/* LTTng-UST commands */
+#define LTTNG_UST_SESSION                      _UST_CMD(0x40)
+#define LTTNG_UST_TRACER_VERSION               \
+       _UST_CMDR(0x41, struct lttng_ust_tracer_version)
+#define LTTNG_UST_TRACEPOINT_LIST              _UST_CMD(0x42)
+#define LTTNG_UST_WAIT_QUIESCENT               _UST_CMD(0x43)
+#define LTTNG_UST_REGISTER_DONE                        _UST_CMD(0x44)
+
+/* Session FD commands */
+#define LTTNG_UST_METADATA                     \
+       _UST_CMDW(0x50, struct lttng_ust_channel)
+#define LTTNG_UST_CHANNEL                      \
+       _UST_CMDW(0x51, struct lttng_ust_channel)
+#define LTTNG_UST_SESSION_START                        _UST_CMD(0x52)
+#define LTTNG_UST_SESSION_STOP                 _UST_CMD(0x53)
+
+/* Channel FD commands */
+#define LTTNG_UST_STREAM                       _UST_CMD(0x60)
+#define LTTNG_UST_EVENT                        \
+       _UST_CMDW(0x61, struct lttng_ust_event)
+
+/* Event and Channel FD commands */
+#define LTTNG_UST_CONTEXT                      \
+       _UST_CMDW(0x70, struct lttng_ust_context)
+
+/* Event, Channel and Session commands */
+#define LTTNG_UST_ENABLE                       _UST_CMD(0x80)
+#define LTTNG_UST_DISABLE                      _UST_CMD(0x81)
+
+#define LTTNG_UST_ROOT_HANDLE  0
+
+struct obj;
+
+struct objd_ops {
+       long (*cmd)(int objd, unsigned int cmd, unsigned long arg);
+       int (*release)(int objd);
+};
+
+/* Create root handle. Always ID 0. */
+int lttng_abi_create_root_handle(void);
+
+const struct objd_ops *objd_ops(int id);
+int objd_unref(int id);
+
+void lttng_ust_abi_exit(void);
+void ltt_events_exit(void);
+
+#endif /* _LTTNG_UST_ABI_H */
index f887fa12441d4b94ae74c5710b44d6850a791c23..5fec7ae4e8cc2f3070ad5e151cd40d9d8a2c857d 100644 (file)
 #include <sys/wait.h>
 #include <urcu/futex.h>
 #include <unistd.h>
+#include <config.h>
 
-#include <ltt-kconsumerd.h>
+#include <lttng-consumerd.h>
 #include <lttng-sessiond-comm.h>
-#include <lttng/lttng-kconsumerd.h>
+#include <lttng/lttng-consumer.h>
 #include <lttngerr.h>
 
 #include "channel.h"
 #include "utils.h"
 #include "ust-ctl.h"
 
+struct consumer_data {
+       enum lttng_consumer_type type;
+
+       pthread_t thread;       /* Worker thread interacting with the consumer */
+       sem_t sem;
+
+       /* Mutex to control consumerd pid assignation */
+       pthread_mutex_t pid_mutex;
+       pid_t pid;
+
+       int err_sock;
+       int cmd_sock;
+
+       /* consumer error and command Unix socket path */
+       char err_unix_sock_path[PATH_MAX];
+       char cmd_unix_sock_path[PATH_MAX];
+};
+
 /* Const values */
 const char default_home_dir[] = DEFAULT_HOME_DIR;
 const char default_tracing_group[] = LTTNG_DEFAULT_TRACING_GROUP;
@@ -63,7 +82,7 @@ const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE;
 
 /* Variables */
 int opt_verbose;    /* Not static for lttngerr.h */
-int opt_verbose_kconsumerd;    /* Not static for lttngerr.h */
+int opt_verbose_consumer;    /* Not static for lttngerr.h */
 int opt_quiet;      /* Not static for lttngerr.h */
 
 const char *progname;
@@ -72,24 +91,27 @@ static int opt_sig_parent;
 static int opt_daemon;
 static int is_root;                    /* Set to 1 if the daemon is running as root */
 static pid_t ppid;          /* Parent PID for --sig-parent option */
-static pid_t kconsumerd_pid;
+
+/* Consumer daemon specific control data */
+static struct consumer_data kconsumer_data = {
+       .type = LTTNG_CONSUMER_KERNEL,
+};
+static struct consumer_data ustconsumer_data = {
+       .type = LTTNG_CONSUMER_UST,
+};
+
 static int dispatch_thread_exit;
 
 /* Global application Unix socket path */
 static char apps_unix_sock_path[PATH_MAX];
 /* Global client Unix socket path */
 static char client_unix_sock_path[PATH_MAX];
-/* kconsumerd error and command Unix socket path */
-static char kconsumerd_err_unix_sock_path[PATH_MAX];
-static char kconsumerd_cmd_unix_sock_path[PATH_MAX];
 /* global wait shm path for UST */
 static char wait_shm_path[PATH_MAX];
 
 /* Sockets and FDs */
 static int client_sock;
 static int apps_sock;
-static int kconsumerd_err_sock;
-static int kconsumerd_cmd_sock;
 static int kernel_tracer_fd;
 static int kernel_poll_pipe[2];
 
@@ -106,17 +128,12 @@ static int thread_quit_pipe[2];
 static int apps_cmd_pipe[2];
 
 /* Pthread, Mutexes and Semaphores */
-static pthread_t kconsumerd_thread;
 static pthread_t apps_thread;
 static pthread_t reg_apps_thread;
 static pthread_t client_thread;
 static pthread_t kernel_thread;
 static pthread_t dispatch_thread;
-static sem_t kconsumerd_sem;
-
 
-/* Mutex to control kconsumerd pid assignation */
-static pthread_mutex_t kconsumerd_pid_mutex;
 
 /*
  * UST registration command queue. This queue is tied with a futex and uses a N
@@ -269,7 +286,7 @@ static void teardown_kernel_session(struct ltt_session *session)
                 * If a custom kernel consumer was registered, close the socket before
                 * tearing down the complete kernel session structure
                 */
-               if (session->kernel_session->consumer_fd != kconsumerd_cmd_sock) {
+               if (session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) {
                        lttcomm_close_unix_sock(session->kernel_session->consumer_fd);
                }
 
@@ -346,7 +363,7 @@ static void cleanup(void)
        DBG("Closing all UST sockets");
        ust_app_clean_list();
 
-       pthread_mutex_destroy(&kconsumerd_pid_mutex);
+       pthread_mutex_destroy(&kconsumer_data.pid_mutex);
 
        DBG("Closing kernel fd");
        close(kernel_tracer_fd);
@@ -396,54 +413,57 @@ static void clean_command_ctx(struct command_ctx **cmd_ctx)
 /*
  * Send all stream fds of kernel channel to the consumer.
  */
-static int send_kconsumerd_channel_fds(int sock,
-               struct ltt_kernel_channel *channel)
+static int send_consumer_channel_streams(struct consumer_data *consumer_data,
+               int sock, struct ltt_kernel_channel *channel)
 {
        int ret;
        size_t nb_fd;
        struct ltt_kernel_stream *stream;
-       struct lttcomm_kconsumerd_header lkh;
-       struct lttcomm_kconsumerd_msg lkm;
+       struct lttcomm_consumer_msg lkm;
 
-       DBG("Sending fds of channel %s to kernel consumer",
+       DBG("Sending streams of channel %s to kernel consumer",
                        channel->channel->name);
-
        nb_fd = channel->stream_count;
 
-       /* Setup header */
-       lkh.payload_size = nb_fd * sizeof(struct lttcomm_kconsumerd_msg);
-       lkh.cmd_type = ADD_STREAM;
-
-       DBG("Sending kconsumerd header");
-
-       ret = lttcomm_send_unix_sock(sock, &lkh,
-                       sizeof(struct lttcomm_kconsumerd_header));
+       /* Send channel */
+       lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
+       lkm.u.channel.channel_key = channel->fd;
+       lkm.u.channel.max_sb_size = channel->channel->attr.subbuf_size;
+       lkm.u.channel.mmap_len = 0;     /* for kernel */
+       DBG("Sending channel %d to consumer", lkm.u.stream.stream_key);
+       ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm));
        if (ret < 0) {
-               perror("send kconsumerd header");
+               perror("send consumer channel");
                goto error;
        }
 
+       /* Send streams */
        cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
-               if (stream->fd != 0) {
-                       lkm.fd = stream->fd;
-                       lkm.state = stream->state;
-                       lkm.max_sb_size = channel->channel->attr.subbuf_size;
-                       lkm.output = channel->channel->attr.output;
-                       strncpy(lkm.path_name, stream->pathname, PATH_MAX);
-                       lkm.path_name[PATH_MAX - 1] = '\0';
-
-                       DBG("Sending fd %d to kconsumerd", lkm.fd);
-
-                       ret = lttcomm_send_fds_unix_sock(sock, &lkm,
-                                       &lkm.fd, 1, sizeof(lkm));
-                       if (ret < 0) {
-                               perror("send kconsumerd fd");
-                               goto error;
-                       }
+               if (!stream->fd) {
+                       continue;
+               }
+               lkm.cmd_type = LTTNG_CONSUMER_ADD_STREAM;
+               lkm.u.stream.channel_key = channel->fd;
+               lkm.u.stream.stream_key = stream->fd;
+               lkm.u.stream.state = stream->state;
+               lkm.u.stream.output = channel->channel->attr.output;
+               lkm.u.stream.mmap_len = 0;      /* for kernel */
+               strncpy(lkm.u.stream.path_name, stream->pathname, PATH_MAX - 1);
+               lkm.u.stream.path_name[PATH_MAX - 1] = '\0';
+               DBG("Sending stream %d to consumer", lkm.u.stream.stream_key);
+               ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm));
+               if (ret < 0) {
+                       perror("send consumer stream");
+                       goto error;
+               }
+               ret = lttcomm_send_fds_unix_sock(sock, &stream->fd, 1);
+               if (ret < 0) {
+                       perror("send consumer stream ancillary data");
+                       goto error;
                }
        }
 
-       DBG("Kconsumerd channel fds sent");
+       DBG("consumer channel streams sent");
 
        return 0;
 
@@ -454,58 +474,64 @@ error:
 /*
  * Send all stream fds of the kernel session to the consumer.
  */
-static int send_kconsumerd_fds(struct ltt_kernel_session *session)
+static int send_consumer_session_streams(struct consumer_data *consumer_data,
+               struct ltt_kernel_session *session)
 {
        int ret;
        struct ltt_kernel_channel *chan;
-       struct lttcomm_kconsumerd_header lkh;
-       struct lttcomm_kconsumerd_msg lkm;
-
-       /* Setup header */
-       lkh.payload_size = sizeof(struct lttcomm_kconsumerd_msg);
-       lkh.cmd_type = ADD_STREAM;
-
-       DBG("Sending kconsumerd header for metadata");
-
-       ret = lttcomm_send_unix_sock(session->consumer_fd, &lkh,
-                       sizeof(struct lttcomm_kconsumerd_header));
-       if (ret < 0) {
-               perror("send kconsumerd header");
-               goto error;
-       }
+       struct lttcomm_consumer_msg lkm;
+       int sock = session->consumer_fd;
 
        DBG("Sending metadata stream fd");
 
        /* Extra protection. It's NOT suppose to be set to 0 at this point */
        if (session->consumer_fd == 0) {
-               session->consumer_fd = kconsumerd_cmd_sock;
+               session->consumer_fd = consumer_data->cmd_sock;
        }
 
        if (session->metadata_stream_fd != 0) {
-               /* Send metadata stream fd first */
-               lkm.fd = session->metadata_stream_fd;
-               lkm.state = ACTIVE_FD;
-               lkm.max_sb_size = session->metadata->conf->attr.subbuf_size;
-               lkm.output = DEFAULT_KERNEL_CHANNEL_OUTPUT;
-               strncpy(lkm.path_name, session->metadata->pathname, PATH_MAX);
-               lkm.path_name[PATH_MAX - 1] = '\0';
-
-               ret = lttcomm_send_fds_unix_sock(session->consumer_fd, &lkm,
-                               &lkm.fd, 1, sizeof(lkm));
+               /* Send metadata channel fd */
+               lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
+               lkm.u.channel.channel_key = session->metadata->fd;
+               lkm.u.channel.max_sb_size = session->metadata->conf->attr.subbuf_size;
+               lkm.u.channel.mmap_len = 0;     /* for kernel */
+               DBG("Sending metadata channel %d to consumer", lkm.u.stream.stream_key);
+               ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm));
+               if (ret < 0) {
+                       perror("send consumer channel");
+                       goto error;
+               }
+
+               /* Send metadata stream fd */
+               lkm.cmd_type = LTTNG_CONSUMER_ADD_STREAM;
+               lkm.u.stream.channel_key = session->metadata->fd;
+               lkm.u.stream.stream_key = session->metadata_stream_fd;
+               lkm.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM;
+               lkm.u.stream.output = DEFAULT_KERNEL_CHANNEL_OUTPUT;
+               lkm.u.stream.mmap_len = 0;      /* for kernel */
+               strncpy(lkm.u.stream.path_name, session->metadata->pathname, PATH_MAX - 1);
+               lkm.u.stream.path_name[PATH_MAX - 1] = '\0';
+               DBG("Sending metadata stream %d to consumer", lkm.u.stream.stream_key);
+               ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm));
+               if (ret < 0) {
+                       perror("send consumer stream");
+                       goto error;
+               }
+               ret = lttcomm_send_fds_unix_sock(sock, &session->metadata_stream_fd, 1);
                if (ret < 0) {
-                       perror("send kconsumerd fd");
+                       perror("send consumer stream");
                        goto error;
                }
        }
 
        cds_list_for_each_entry(chan, &session->channel_list.head, list) {
-               ret = send_kconsumerd_channel_fds(session->consumer_fd, chan);
+               ret = send_consumer_channel_streams(consumer_data, sock, chan);
                if (ret < 0) {
                        goto error;
                }
        }
 
-       DBG("Kconsumerd fds (metadata and channel streams) sent");
+       DBG("consumer fds (metadata and channel streams) sent");
 
        return 0;
 
@@ -618,7 +644,7 @@ error:
  *
  * Useful for CPU hotplug feature.
  */
-static int update_kernel_stream(int fd)
+static int update_stream(struct consumer_data *consumer_data, int fd)
 {
        int ret = 0;
        struct ltt_session *session;
@@ -636,7 +662,7 @@ static int update_kernel_stream(int fd)
 
                /* This is not suppose to be 0 but this is an extra security check */
                if (session->kernel_session->consumer_fd == 0) {
-                       session->kernel_session->consumer_fd = kconsumerd_cmd_sock;
+                       session->kernel_session->consumer_fd = consumer_data->cmd_sock;
                }
 
                cds_list_for_each_entry(channel,
@@ -653,8 +679,8 @@ static int update_kernel_stream(int fd)
                                 * that tracing is started so it is safe to send our updated
                                 * stream fds.
                                 */
-                               if (session->kernel_session->kconsumer_fds_sent == 1) {
-                                       ret = send_kconsumerd_channel_fds(
+                               if (session->kernel_session->consumer_fds_sent == 1) {
+                                       ret = send_consumer_channel_streams(consumer_data,
                                                        session->kernel_session->consumer_fd, channel);
                                        if (ret < 0) {
                                                goto error;
@@ -754,7 +780,7 @@ static void *thread_manage_kernel(void *data)
                                 * kernel session and updating the kernel consumer
                                 */
                                if (revents & LPOLLIN) {
-                                       ret = update_kernel_stream(pollfd);
+                                       ret = update_stream(&kconsumer_data, pollfd);
                                        if (ret < 0) {
                                                continue;
                                        }
@@ -779,18 +805,19 @@ error:
 }
 
 /*
- * This thread manage the kconsumerd error sent back to the session daemon.
+ * This thread manage the consumer error sent back to the session daemon.
  */
-static void *thread_manage_kconsumerd(void *data)
+static void *thread_manage_consumer(void *data)
 {
        int sock = 0, i, ret, pollfd;
        uint32_t revents, nb_fd;
        enum lttcomm_return_code code;
        struct lttng_poll_event events;
+       struct consumer_data *consumer_data = data;
 
-       DBG("[thread] Manage kconsumerd started");
+       DBG("[thread] Manage consumer started");
 
-       ret = lttcomm_listen_unix_sock(kconsumerd_err_sock);
+       ret = lttcomm_listen_unix_sock(consumer_data->err_sock);
        if (ret < 0) {
                goto error;
        }
@@ -804,7 +831,7 @@ static void *thread_manage_kconsumerd(void *data)
                goto error;
        }
 
-       ret = lttng_poll_add(&events, kconsumerd_err_sock, LPOLLIN | LPOLLRDHUP);
+       ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
        if (ret < 0) {
                goto error;
        }
@@ -829,20 +856,20 @@ static void *thread_manage_kconsumerd(void *data)
                }
 
                /* Event on the registration socket */
-               if (pollfd == kconsumerd_err_sock) {
+               if (pollfd == consumer_data->err_sock) {
                        if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("Kconsumerd err socket poll error");
+                               ERR("consumer err socket poll error");
                                goto error;
                        }
                }
        }
 
-       sock = lttcomm_accept_unix_sock(kconsumerd_err_sock);
+       sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
        if (sock < 0) {
                goto error;
        }
 
-       DBG2("Receiving code from kconsumerd_err_sock");
+       DBG2("Receiving code from consumer err_sock");
 
        /* Getting status code from kconsumerd */
        ret = lttcomm_recv_unix_sock(sock, &code,
@@ -851,25 +878,25 @@ static void *thread_manage_kconsumerd(void *data)
                goto error;
        }
 
-       if (code == KCONSUMERD_COMMAND_SOCK_READY) {
-               kconsumerd_cmd_sock =
-                       lttcomm_connect_unix_sock(kconsumerd_cmd_unix_sock_path);
-               if (kconsumerd_cmd_sock < 0) {
-                       sem_post(&kconsumerd_sem);
-                       perror("kconsumerd connect");
+       if (code == CONSUMERD_COMMAND_SOCK_READY) {
+               consumer_data->cmd_sock =
+                       lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
+               if (consumer_data->cmd_sock < 0) {
+                       sem_post(&consumer_data->sem);
+                       perror("consumer connect");
                        goto error;
                }
                /* Signal condition to tell that the kconsumerd is ready */
-               sem_post(&kconsumerd_sem);
-               DBG("Kconsumerd command socket ready");
+               sem_post(&consumer_data->sem);
+               DBG("consumer command socket ready");
        } else {
-               ERR("Kconsumerd error when waiting for SOCK_READY : %s",
+               ERR("consumer error when waiting for SOCK_READY : %s",
                                lttcomm_get_readable_code(-code));
                goto error;
        }
 
        /* Remove the kconsumerd error sock since we've established a connexion */
-       ret = lttng_poll_del(&events, kconsumerd_err_sock);
+       ret = lttng_poll_del(&events, consumer_data->err_sock);
        if (ret < 0) {
                goto error;
        }
@@ -902,7 +929,7 @@ static void *thread_manage_kconsumerd(void *data)
                /* Event on the kconsumerd socket */
                if (pollfd == sock) {
                        if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("Kconsumerd err socket second poll error");
+                               ERR("consumer err socket second poll error");
                                goto error;
                        }
                }
@@ -912,21 +939,21 @@ static void *thread_manage_kconsumerd(void *data)
        ret = lttcomm_recv_unix_sock(sock, &code,
                        sizeof(enum lttcomm_return_code));
        if (ret <= 0) {
-               ERR("Kconsumerd closed the command socket");
+               ERR("consumer closed the command socket");
                goto error;
        }
 
-       ERR("Kconsumerd return code : %s", lttcomm_get_readable_code(-code));
+       ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
 
 error:
-       DBG("Kconsumerd thread dying");
-       close(kconsumerd_err_sock);
-       close(kconsumerd_cmd_sock);
+       DBG("consumer thread dying");
+       close(consumer_data->err_sock);
+       close(consumer_data->cmd_sock);
        close(sock);
 
-       unlink(kconsumerd_err_unix_sock_path);
-       unlink(kconsumerd_cmd_unix_sock_path);
-       kconsumerd_pid = 0;
+       unlink(consumer_data->err_unix_sock_path);
+       unlink(consumer_data->cmd_unix_sock_path);
+       consumer_data->pid = 0;
 
        lttng_poll_clean(&events);
 
@@ -1259,10 +1286,10 @@ error:
 }
 
 /*
- * Start the thread_manage_kconsumerd. This must be done after a kconsumerd
+ * Start the thread_manage_consumer. This must be done after a lttng-consumerd
  * exec or it will fails.
  */
-static int spawn_kconsumerd_thread(void)
+static int spawn_consumer_thread(struct consumer_data *consumer_data)
 {
        int ret;
        struct timespec timeout;
@@ -1271,16 +1298,16 @@ static int spawn_kconsumerd_thread(void)
        timeout.tv_nsec = 0;
 
        /* Setup semaphore */
-       ret = sem_init(&kconsumerd_sem, 0, 0);
+       ret = sem_init(&consumer_data->sem, 0, 0);
        if (ret < 0) {
-               PERROR("sem_init kconsumerd_sem");
+               PERROR("sem_init consumer semaphore");
                goto error;
        }
 
-       ret = pthread_create(&kconsumerd_thread, NULL,
-                       thread_manage_kconsumerd, (void *) NULL);
+       ret = pthread_create(&consumer_data->thread, NULL,
+                       thread_manage_consumer, consumer_data);
        if (ret != 0) {
-               PERROR("pthread_create kconsumerd");
+               PERROR("pthread_create consumer");
                ret = -1;
                goto error;
        }
@@ -1288,13 +1315,13 @@ static int spawn_kconsumerd_thread(void)
        /* Get time for sem_timedwait absolute timeout */
        ret = clock_gettime(CLOCK_REALTIME, &timeout);
        if (ret < 0) {
-               PERROR("clock_gettime spawn kconsumerd");
+               PERROR("clock_gettime spawn consumer");
                /* Infinite wait for the kconsumerd thread to be ready */
-               ret = sem_wait(&kconsumerd_sem);
+               ret = sem_wait(&consumer_data->sem);
        } else {
                /* Normal timeout if the gettime was successful */
                timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT;
-               ret = sem_timedwait(&kconsumerd_sem, &timeout);
+               ret = sem_timedwait(&consumer_data->sem, &timeout);
        }
 
        if (ret < 0) {
@@ -1303,24 +1330,24 @@ static int spawn_kconsumerd_thread(void)
                         * Call has timed out so we kill the kconsumerd_thread and return
                         * an error.
                         */
-                       ERR("The kconsumerd thread was never ready. Killing it");
-                       ret = pthread_cancel(kconsumerd_thread);
+                       ERR("The consumer thread was never ready. Killing it");
+                       ret = pthread_cancel(consumer_data->thread);
                        if (ret < 0) {
-                               PERROR("pthread_cancel kconsumerd_thread");
+                               PERROR("pthread_cancel consumer thread");
                        }
                } else {
-                       PERROR("semaphore wait failed kconsumerd thread");
+                       PERROR("semaphore wait failed consumer thread");
                }
                goto error;
        }
 
-       pthread_mutex_lock(&kconsumerd_pid_mutex);
-       if (kconsumerd_pid == 0) {
+       pthread_mutex_lock(&consumer_data->pid_mutex);
+       if (consumer_data->pid == 0) {
                ERR("Kconsumerd did not start");
-               pthread_mutex_unlock(&kconsumerd_pid_mutex);
+               pthread_mutex_unlock(&consumer_data->pid_mutex);
                goto error;
        }
-       pthread_mutex_unlock(&kconsumerd_pid_mutex);
+       pthread_mutex_unlock(&consumer_data->pid_mutex);
 
        return 0;
 
@@ -1329,96 +1356,103 @@ error:
 }
 
 /*
- * Join kernel consumer thread
+ * Join consumer thread
  */
-static int join_kconsumerd_thread(void)
+static int join_consumer_thread(struct consumer_data *consumer_data)
 {
        void *status;
        int ret;
 
-       if (kconsumerd_pid != 0) {
-               ret = kill(kconsumerd_pid, SIGTERM);
+       if (consumer_data->pid != 0) {
+               ret = kill(consumer_data->pid, SIGTERM);
                if (ret) {
-                       ERR("Error killing kconsumerd");
+                       ERR("Error killing consumer daemon");
                        return ret;
                }
-               return pthread_join(kconsumerd_thread, &status);
+               return pthread_join(consumer_data->thread, &status);
        } else {
                return 0;
        }
 }
 
 /*
- * Fork and exec a kernel consumer daemon (kconsumerd).
+ * Fork and exec a consumer daemon (consumerd).
  *
  * Return pid if successful else -1.
  */
-static pid_t spawn_kconsumerd(void)
+static pid_t spawn_consumerd(struct consumer_data *consumer_data)
 {
        int ret;
        pid_t pid;
        const char *verbosity;
 
-       DBG("Spawning kconsumerd");
+       DBG("Spawning consumerd");
 
        pid = fork();
        if (pid == 0) {
                /*
-                * Exec kconsumerd.
+                * Exec consumerd.
                 */
-               if (opt_verbose > 1 || opt_verbose_kconsumerd) {
+               if (opt_verbose > 1 || opt_verbose_consumer) {
                        verbosity = "--verbose";
                } else {
                        verbosity = "--quiet";
                }
-               execl(INSTALL_BIN_PATH "/ltt-kconsumerd",
-                               "ltt-kconsumerd", verbosity, NULL);
+               switch (consumer_data->type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       execl(INSTALL_BIN_PATH "/lttng-consumerd",
+                                       "lttng-consumerd", verbosity, "-k", NULL);
+                       break;
+               case LTTNG_CONSUMER_UST:
+                       execl(INSTALL_BIN_PATH "/lttng-consumerd",
+                                       "lttng-consumerd", verbosity, "-u", NULL);
+                       break;
+               default:
+                       perror("unknown consumer type");
+                       exit(EXIT_FAILURE);
+               }
                if (errno != 0) {
                        perror("kernel start consumer exec");
                }
                exit(EXIT_FAILURE);
        } else if (pid > 0) {
                ret = pid;
-               goto error;
        } else {
-               perror("kernel start consumer fork");
+               perror("start consumer fork");
                ret = -errno;
-               goto error;
        }
-
-error:
        return ret;
 }
 
 /*
- * Spawn the kconsumerd daemon and session daemon thread.
+ * Spawn the consumerd daemon and session daemon thread.
  */
-static int start_kconsumerd(void)
+static int start_consumerd(struct consumer_data *consumer_data)
 {
        int ret;
 
-       pthread_mutex_lock(&kconsumerd_pid_mutex);
-       if (kconsumerd_pid != 0) {
-               pthread_mutex_unlock(&kconsumerd_pid_mutex);
+       pthread_mutex_lock(&consumer_data->pid_mutex);
+       if (consumer_data->pid != 0) {
+               pthread_mutex_unlock(&consumer_data->pid_mutex);
                goto end;
        }
 
-       ret = spawn_kconsumerd();
+       ret = spawn_consumerd(consumer_data);
        if (ret < 0) {
-               ERR("Spawning kconsumerd failed");
-               pthread_mutex_unlock(&kconsumerd_pid_mutex);
+               ERR("Spawning consumerd failed");
+               pthread_mutex_unlock(&consumer_data->pid_mutex);
                goto error;
        }
 
-       /* Setting up the global kconsumerd_pid */
-       kconsumerd_pid = ret;
-       DBG2("Kconsumerd pid %d", kconsumerd_pid);
-       pthread_mutex_unlock(&kconsumerd_pid_mutex);
+       /* Setting up the consumer_data pid */
+       consumer_data->pid = ret;
+       DBG2("consumer pid %d", consumer_data->pid);
+       pthread_mutex_unlock(&consumer_data->pid_mutex);
 
-       DBG2("Spawning kconsumerd thread");
-       ret = spawn_kconsumerd_thread();
+       DBG2("Spawning consumer control thread");
+       ret = spawn_consumer_thread(consumer_data);
        if (ret < 0) {
-               ERR("Fatal error spawning kconsumerd thread");
+               ERR("Fatal error spawning consumer control thread");
                goto error;
        }
 
@@ -1580,23 +1614,23 @@ static int init_kernel_tracing(struct ltt_kernel_session *session)
 {
        int ret = 0;
 
-       if (session->kconsumer_fds_sent == 0) {
+       if (session->consumer_fds_sent == 0) {
                /*
                 * Assign default kernel consumer socket if no consumer assigned to the
                 * kernel session. At this point, it's NOT suppose to be 0 but this is
                 * an extra security check.
                 */
                if (session->consumer_fd == 0) {
-                       session->consumer_fd = kconsumerd_cmd_sock;
+                       session->consumer_fd = kconsumer_data.cmd_sock;
                }
 
-               ret = send_kconsumerd_fds(session);
+               ret = send_consumer_session_streams(&kconsumer_data, session);
                if (ret < 0) {
                        ret = LTTCOMM_KERN_CONSUMER_FAIL;
                        goto error;
                }
 
-               session->kconsumer_fds_sent = 1;
+               session->consumer_fds_sent = 1;
        }
 
 error:
@@ -1677,8 +1711,8 @@ static int create_kernel_session(struct ltt_session *session)
        }
 
        /* Set kernel consumer socket fd */
-       if (kconsumerd_cmd_sock) {
-               session->kernel_session->consumer_fd = kconsumerd_cmd_sock;
+       if (kconsumer_data.cmd_sock) {
+               session->kernel_session->consumer_fd = kconsumer_data.cmd_sock;
        }
 
        ret = mkdir_recursive(session->kernel_session->trace_path,
@@ -2614,17 +2648,17 @@ static int process_client_msg(struct command_ctx *cmd_ctx)
                        }
 
                        /* Start the kernel consumer daemon */
-                       pthread_mutex_lock(&kconsumerd_pid_mutex);
-                       if (kconsumerd_pid == 0 &&
+                       pthread_mutex_lock(&kconsumer_data.pid_mutex);
+                       if (kconsumer_data.pid == 0 &&
                                        cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) {
-                               pthread_mutex_unlock(&kconsumerd_pid_mutex);
-                               ret = start_kconsumerd();
+                               pthread_mutex_unlock(&kconsumer_data.pid_mutex);
+                               ret = start_consumerd(&kconsumer_data);
                                if (ret < 0) {
                                        ret = LTTCOMM_KERN_CONSUMER_FAIL;
                                        goto error;
                                }
                        }
-                       pthread_mutex_unlock(&kconsumerd_pid_mutex);
+                       pthread_mutex_unlock(&kconsumer_data.pid_mutex);
                }
                break;
        case LTTNG_DOMAIN_UST_PID:
@@ -3058,13 +3092,15 @@ static void usage(void)
        fprintf(stderr, "  -a, --apps-sock PATH               Specify path for apps unix socket\n");
        fprintf(stderr, "      --kconsumerd-err-sock PATH     Specify path for the kernel consumer error socket\n");
        fprintf(stderr, "      --kconsumerd-cmd-sock PATH     Specify path for the kernel consumer command socket\n");
+       fprintf(stderr, "      --ustconsumerd-err-sock PATH   Specify path for the UST consumer error socket\n");
+       fprintf(stderr, "      --ustconsumerd-cmd-sock PATH   Specify path for the UST consumer command socket\n");
        fprintf(stderr, "  -d, --daemonize                    Start as a daemon.\n");
        fprintf(stderr, "  -g, --group NAME                   Specify the tracing group name. (default: tracing)\n");
        fprintf(stderr, "  -V, --version                      Show version number.\n");
        fprintf(stderr, "  -S, --sig-parent                   Send SIGCHLD to parent pid to notify readiness.\n");
        fprintf(stderr, "  -q, --quiet                        No output at all.\n");
        fprintf(stderr, "  -v, --verbose                      Verbose mode. Activate DBG() macro.\n");
-       fprintf(stderr, "      --verbose-kconsumerd           Verbose mode for kconsumerd. Activate DBG() macro.\n");
+       fprintf(stderr, "      --verbose-consumer             Verbose mode for consumer. Activate DBG() macro.\n");
 }
 
 /*
@@ -3077,8 +3113,10 @@ static int parse_args(int argc, char **argv)
        static struct option long_options[] = {
                { "client-sock", 1, 0, 'c' },
                { "apps-sock", 1, 0, 'a' },
-               { "kconsumerd-cmd-sock", 1, 0, 0 },
-               { "kconsumerd-err-sock", 1, 0, 0 },
+               { "kconsumerd-cmd-sock", 1, 0, 'C' },
+               { "kconsumerd-err-sock", 1, 0, 'E' },
+               { "ustconsumerd-cmd-sock", 1, 0, 'D' },
+               { "ustconsumerd-err-sock", 1, 0, 'F' },
                { "daemonize", 0, 0, 'd' },
                { "sig-parent", 0, 0, 'S' },
                { "help", 0, 0, 'h' },
@@ -3086,13 +3124,13 @@ static int parse_args(int argc, char **argv)
                { "version", 0, 0, 'V' },
                { "quiet", 0, 0, 'q' },
                { "verbose", 0, 0, 'v' },
-               { "verbose-kconsumerd", 0, 0, 'Z' },
+               { "verbose-consumer", 0, 0, 'Z' },
                { NULL, 0, 0, 0 }
        };
 
        while (1) {
                int option_index = 0;
-               c = getopt_long(argc, argv, "dhqvVS" "a:c:g:s:E:C:Z",
+               c = getopt_long(argc, argv, "dhqvVS" "a:c:g:s:C:E:D:F:Z",
                                long_options, &option_index);
                if (c == -1) {
                        break;
@@ -3127,10 +3165,16 @@ static int parse_args(int argc, char **argv)
                        opt_sig_parent = 1;
                        break;
                case 'E':
-                       snprintf(kconsumerd_err_unix_sock_path, PATH_MAX, "%s", optarg);
+                       snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX, "%s", optarg);
                        break;
                case 'C':
-                       snprintf(kconsumerd_cmd_unix_sock_path, PATH_MAX, "%s", optarg);
+                       snprintf(kconsumer_data.cmd_unix_sock_path, PATH_MAX, "%s", optarg);
+                       break;
+               case 'F':
+                       snprintf(ustconsumer_data.err_unix_sock_path, PATH_MAX, "%s", optarg);
+                       break;
+               case 'D':
+                       snprintf(ustconsumer_data.cmd_unix_sock_path, PATH_MAX, "%s", optarg);
                        break;
                case 'q':
                        opt_quiet = 1;
@@ -3140,7 +3184,7 @@ static int parse_args(int argc, char **argv)
                        opt_verbose += 1;
                        break;
                case 'Z':
-                       opt_verbose_kconsumerd += 1;
+                       opt_verbose_consumer += 1;
                        break;
                default:
                        /* Unknown option or other error.
@@ -3258,10 +3302,17 @@ static int set_permissions(void)
                perror("chown");
        }
 
-       /* kconsumerd error socket path */
-       ret = chown(kconsumerd_err_unix_sock_path, 0, gid);
+       /* kconsumer error socket path */
+       ret = chown(kconsumer_data.err_unix_sock_path, 0, gid);
        if (ret < 0) {
-               ERR("Unable to set group on %s", kconsumerd_err_unix_sock_path);
+               ERR("Unable to set group on %s", kconsumer_data.err_unix_sock_path);
+               perror("chown");
+       }
+
+       /* ustconsumer error socket path */
+       ret = chown(ustconsumer_data.err_unix_sock_path, 0, gid);
+       if (ret < 0) {
+               ERR("Unable to set group on %s", ustconsumer_data.err_unix_sock_path);
                perror("chown");
        }
 
@@ -3312,43 +3363,49 @@ error:
  * Setup sockets and directory needed by the kconsumerd communication with the
  * session daemon.
  */
-static int set_kconsumerd_sockets(void)
+static int set_consumer_sockets(struct consumer_data *consumer_data)
 {
        int ret;
+       const char *path = consumer_data->type == LTTNG_CONSUMER_KERNEL ?
+                       KCONSUMERD_PATH : USTCONSUMERD_PATH;
 
-       if (strlen(kconsumerd_err_unix_sock_path) == 0) {
-               snprintf(kconsumerd_err_unix_sock_path, PATH_MAX,
-                               KCONSUMERD_ERR_SOCK_PATH);
+       if (strlen(consumer_data->err_unix_sock_path) == 0) {
+               snprintf(consumer_data->err_unix_sock_path, PATH_MAX,
+                       consumer_data->type == LTTNG_CONSUMER_KERNEL ?
+                               KCONSUMERD_ERR_SOCK_PATH :
+                               USTCONSUMERD_ERR_SOCK_PATH);
        }
 
-       if (strlen(kconsumerd_cmd_unix_sock_path) == 0) {
-               snprintf(kconsumerd_cmd_unix_sock_path, PATH_MAX,
-                               KCONSUMERD_CMD_SOCK_PATH);
+       if (strlen(consumer_data->cmd_unix_sock_path) == 0) {
+               snprintf(consumer_data->cmd_unix_sock_path, PATH_MAX,
+                       consumer_data->type == LTTNG_CONSUMER_KERNEL ?
+                               KCONSUMERD_CMD_SOCK_PATH :
+                               USTCONSUMERD_CMD_SOCK_PATH);
        }
 
-       ret = mkdir(KCONSUMERD_PATH, S_IRWXU | S_IRWXG);
+       ret = mkdir(path, S_IRWXU | S_IRWXG);
        if (ret < 0) {
                if (errno != EEXIST) {
-                       ERR("Failed to create " KCONSUMERD_PATH);
+                       ERR("Failed to create %s", path);
                        goto error;
                }
                ret = 0;
        }
 
        /* Create the kconsumerd error unix socket */
-       kconsumerd_err_sock =
-               lttcomm_create_unix_sock(kconsumerd_err_unix_sock_path);
-       if (kconsumerd_err_sock < 0) {
-               ERR("Create unix sock failed: %s", kconsumerd_err_unix_sock_path);
+       consumer_data->err_sock =
+               lttcomm_create_unix_sock(consumer_data->err_unix_sock_path);
+       if (consumer_data->err_sock < 0) {
+               ERR("Create unix sock failed: %s", consumer_data->err_unix_sock_path);
                ret = -1;
                goto error;
        }
 
        /* File permission MUST be 660 */
-       ret = chmod(kconsumerd_err_unix_sock_path,
+       ret = chmod(consumer_data->err_unix_sock_path,
                        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
        if (ret < 0) {
-               ERR("Set file permissions failed: %s", kconsumerd_err_unix_sock_path);
+               ERR("Set file permissions failed: %s", consumer_data->err_unix_sock_path);
                perror("chmod");
                goto error;
        }
@@ -3543,11 +3600,14 @@ int main(int argc, char **argv)
         * kernel tracer.
         */
        if (is_root) {
-               ret = set_kconsumerd_sockets();
+               ret = set_consumer_sockets(&kconsumer_data);
+               if (ret < 0) {
+                       goto exit;
+               }
+               ret = set_consumer_sockets(&ustconsumer_data);
                if (ret < 0) {
                        goto exit;
                }
-
                /* Setup kernel tracer */
                init_kernel_tracer();
 
@@ -3670,9 +3730,9 @@ exit_dispatch:
                goto error;     /* join error, exit without cleanup */
        }
 
-       ret = join_kconsumerd_thread();
+       ret = join_consumer_thread(&kconsumer_data);
        if (ret != 0) {
-               perror("join_kconsumerd");
+               perror("join_consumer");
                goto error;     /* join error, exit without cleanup */
        }
 
index 6bf5752fd76853686cf8f9aa1f194f6c588dbf20..2bf2f03c9e99c691b8363b3881e89efcc17c60d8 100644 (file)
@@ -82,7 +82,7 @@ struct ltt_kernel_stream {
 struct ltt_kernel_session {
        int fd;
        int metadata_stream_fd;
-       int kconsumer_fds_sent;
+       int consumer_fds_sent;
        int consumer_fd;
        unsigned int channel_count;
        unsigned int stream_count_global;
index fe007141e90c0c4d09bd9e2f481e1066c63bef50..cf1642a6e9163a75f3ce9c4c7c478476304486ec 100644 (file)
@@ -23,7 +23,6 @@
 
 #include <lttngerr.h>
 #include <lttng-share.h>
-#include <lttng-ust.h>
 
 #include "trace-ust.h"
 
@@ -123,7 +122,7 @@ struct ltt_ust_session *trace_ust_create_session(char *path, pid_t pid,
        /* Init data structure */
        lus->handle = -1;
        lus->enabled = 1;
-       lus->uconsumer_fds_sent = 0;
+       lus->consumer_fds_sent = 0;
        lus->metadata = NULL;
        lus->channels.count = 0;
        CDS_INIT_LIST_HEAD(&lus->channels.head);
index 9a236626931f58fbe566cd1b5a3fe301c9a85e2d..abbf9a9d191a8498f1e714e4435fa4df50acda56 100644 (file)
 #ifndef _LTT_TRACE_UST_H
 #define _LTT_TRACE_UST_H
 
+#include <config.h>
 #include <limits.h>
 #include <urcu/list.h>
-
 #include <lttng/lttng.h>
-#include <lttng-ust.h>
+
+/*
+ * FIXME: temporary workaround: we use a lttng-tools local version of
+ * lttng-ust-abi.h if UST is not found. Eventually, we should use our
+ * own internal structures within lttng-tools instead of relying on the
+ * UST ABI.
+ */
+#ifdef CONFIG_CONFIG_LTTNG_TOOLS_HAVE_UST
+#include <ust/lttng-ust-abi.h>
+#else
+#include "lttng-ust-abi.h"
+#endif
 
 /*
  * UST session list.
@@ -77,7 +88,7 @@ struct ltt_ust_metadata {
 struct ltt_ust_session {
        int handle;
        int enabled;
-       int uconsumer_fds_sent;
+       int consumer_fds_sent;
        char path[PATH_MAX];
        struct lttng_domain domain;
        struct ltt_ust_metadata *metadata;
@@ -85,6 +96,8 @@ struct ltt_ust_session {
        struct cds_list_head list;
 };
 
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+
 /*
  * Lookup functions. NULL is returned if not found.
  */
@@ -114,4 +127,67 @@ void trace_ust_destroy_metadata(struct ltt_ust_metadata *metadata);
 void trace_ust_destroy_channel(struct ltt_ust_channel *channel);
 void trace_ust_destroy_event(struct ltt_ust_event *event);
 
+#else
+
+static inline
+struct ltt_ust_event *trace_ust_get_event_by_name(
+               char *name, struct ltt_ust_channel *channel)
+{
+       return NULL;
+}
+static inline
+struct ltt_ust_channel *trace_ust_get_channel_by_name(
+               char *name, struct ltt_ust_session *session)
+{
+       return NULL;
+}
+static inline
+struct ltt_ust_session *trace_ust_get_session_by_pid(
+               struct ltt_ust_session_list *session_list, pid_t pid)
+{
+       return NULL;
+}
+
+static inline
+struct ltt_ust_session *trace_ust_create_session(char *path, pid_t pid,
+               struct lttng_domain *domain)
+{
+       return NULL;
+}
+static inline
+struct ltt_ust_channel *trace_ust_create_channel(struct lttng_channel *attr,
+               char *path)
+{
+       return NULL;
+}
+static inline
+struct ltt_ust_event *trace_ust_create_event(struct lttng_event *ev)
+{
+       return NULL;
+}
+static inline
+struct ltt_ust_metadata *trace_ust_create_metadata(char *path)
+{
+       return NULL;
+}
+
+static inline
+void trace_ust_destroy_session(struct ltt_ust_session *session)
+{
+}
+static inline
+void trace_ust_destroy_metadata(struct ltt_ust_metadata *metadata)
+{
+}
+static inline
+void trace_ust_destroy_channel(struct ltt_ust_channel *channel)
+{
+}
+static inline
+void trace_ust_destroy_event(struct ltt_ust_event *event)
+{
+}
+
+#endif
+
 #endif /* _LTT_TRACE_UST_H */
index f2d3d261c2a07394b79c55f7bc5e906f4bf68606..cc08a4c338b1da77f7f619d1af3ba0b822d5d08a 100644 (file)
@@ -40,7 +40,7 @@ struct ust_register_msg {
 /*
  * Traceable application list.
  */
-struct ust_app_list{
+struct ust_app_list {
        /*
         * This lock protects any read/write access to the list and count (which is
         * basically the list size). All public functions in traceable-app.c
@@ -76,6 +76,8 @@ struct ust_app {
        struct cds_list_head list;
 };
 
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+
 int ust_app_register(struct ust_register_msg *msg, int sock);
 void ust_app_unregister(int sock);
 unsigned int ust_app_list_count(void);
@@ -86,4 +88,48 @@ void ust_app_clean_list(void);
 struct ust_app_list *ust_app_get_list(void);
 struct ust_app *ust_app_get_by_pid(pid_t pid);
 
+#else
+
+static inline
+int ust_app_register(struct ust_register_msg *msg, int sock)
+{
+       return -ENOSYS;
+}
+static inline
+void ust_app_unregister(int sock)
+{
+}
+static inline
+unsigned int ust_app_list_count(void)
+{
+       return 0;
+}
+
+static inline
+void ust_app_lock_list(void)
+{
+}
+static inline
+void ust_app_unlock_list(void)
+{
+}
+static inline
+void ust_app_clean_list(void)
+{
+}
+static inline
+struct ust_app_list *ust_app_get_list(void)
+{
+       return NULL;
+}
+static inline
+struct ust_app *ust_app_get_by_pid(pid_t pid)
+{
+       return NULL;
+}
+
+
+
+#endif
+
 #endif /* _TRACEABLE_APP_H */
index 455d40c0af68cedb941120b5759d05fb5e4e75be..724260511e621660669e1fc3de28d1762279bc9a 100644 (file)
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
  */
 
+#include <config.h>
 #include <stdlib.h>
-
 #include <lttngerr.h>
-
 #include "ust-comm.h"
 
 /*
index 2753b6f5ab639b593692c62292a5740cd08c653f..2f43b1e60316435e8bfcdcb6989c90fb1095d370 100644 (file)
@@ -17,6 +17,7 @@
  */
 
 #define _GNU_SOURCE
+#include <config.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
index c4d754ecaf290a69fae840efbad5b99ea850d6c8..a592abf2276d70cbfc7806a95873dfc53873e0dc 100644 (file)
@@ -23,6 +23,8 @@
 
 #include "trace-ust.h"
 
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+
 int ustctl_register_done(int sock);
 int ustctl_create_channel(int sock, struct ltt_ust_session *session,
                struct lttng_channel *channel);
@@ -33,4 +35,42 @@ int ustctl_disable_channel(int sock, struct ltt_ust_session *session,
 int ustctl_enable_channel(int sock, struct ltt_ust_session *session,
                struct ltt_ust_channel *chan);
 
+#else
+
+static inline
+int ustctl_register_done(int sock)
+{
+       return -ENOSYS;
+}
+static inline
+int ustctl_create_channel(int sock, struct ltt_ust_session *session,
+               struct lttng_channel *channel)
+{
+       return -ENOSYS;
+}
+static inline
+int ustctl_create_session(int sock, struct ltt_ust_session *session)
+{
+       return -ENOSYS;
+}
+static inline
+int ustctl_destroy_session(int sock, struct ltt_ust_session *session)
+{
+       return -ENOSYS;
+}
+static inline
+int ustctl_disable_channel(int sock, struct ltt_ust_session *session,
+               struct ltt_ust_channel *chan)
+{
+       return -ENOSYS;
+}
+static inline
+int ustctl_enable_channel(int sock, struct ltt_ust_session *session,
+               struct ltt_ust_channel *chan)
+{
+       return -ENOSYS;
+}
+
+#endif
+
 #endif /* _LTT_UST_CTL_H */
diff --git a/lttng-consumerd/Makefile.am b/lttng-consumerd/Makefile.am
new file mode 100644 (file)
index 0000000..2c1d564
--- /dev/null
@@ -0,0 +1,14 @@
+AM_CPPFLAGS = -I$(top_srcdir)/include
+
+bin_PROGRAMS = lttng-consumerd
+
+lttng_consumerd_SOURCES = lttng-consumerd.c
+
+lttng_consumerd_LDADD = \
+          $(top_builddir)/libkernelctl/libkernelctl.la \
+          $(top_builddir)/liblttng-consumer/liblttng-consumer.la \
+          $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la
+
+if LTTNG_TOOLS_HAVE_UST
+lttng_consumerd_LDADD += -lustctl
+endif
diff --git a/lttng-consumerd/lttng-consumerd.c b/lttng-consumerd/lttng-consumerd.c
new file mode 100644 (file)
index 0000000..cac71ca
--- /dev/null
@@ -0,0 +1,449 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#define _GNU_SOURCE
+#include <fcntl.h>
+#include <getopt.h>
+#include <grp.h>
+#include <limits.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <urcu/list.h>
+#include <poll.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <assert.h>
+#include <config.h>
+
+#include <lttng-consumerd.h>
+#include <lttng-kernel-ctl.h>
+#include <lttng-sessiond-comm.h>
+#include <lttng/lttng-kconsumer.h>
+#include <lttng/lttng-ustconsumer.h>
+#include <lttngerr.h>
+
+/* TODO : support UST (all direct kernctl accesses). */
+
+/* the two threads (receive fd and poll) */
+static pthread_t threads[2];
+
+/* to count the number of time the user pressed ctrl+c */
+static int sigintcount = 0;
+
+/* Argument variables */
+int opt_quiet;
+int opt_verbose;
+static int opt_daemon;
+static const char *progname;
+static char command_sock_path[PATH_MAX]; /* Global command socket path */
+static char error_sock_path[PATH_MAX]; /* Global error path */
+static enum lttng_consumer_type opt_type = LTTNG_CONSUMER_KERNEL;
+
+/* the liblttngkconsumerd context */
+static struct lttng_consumer_local_data *ctx;
+
+/*
+ * Signal handler for the daemon
+ */
+static void sighandler(int sig)
+{
+       if (sig == SIGINT && sigintcount++ == 0) {
+               DBG("ignoring first SIGINT");
+               return;
+       }
+
+       lttng_consumer_should_exit(ctx);
+}
+
+/*
+ * Setup signal handler for :
+ *      SIGINT, SIGTERM, SIGPIPE
+ */
+static int set_signal_handler(void)
+{
+       int ret = 0;
+       struct sigaction sa;
+       sigset_t sigset;
+
+       if ((ret = sigemptyset(&sigset)) < 0) {
+               perror("sigemptyset");
+               return ret;
+       }
+
+       sa.sa_handler = sighandler;
+       sa.sa_mask = sigset;
+       sa.sa_flags = 0;
+       if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
+               perror("sigaction");
+               return ret;
+       }
+
+       if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
+               perror("sigaction");
+               return ret;
+       }
+
+       if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
+               perror("sigaction");
+               return ret;
+       }
+
+       return ret;
+}
+
+/*
+ * usage function on stderr
+ */
+static void usage(void)
+{
+       fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
+       fprintf(stderr, "  -h, --help                         "
+                       "Display this usage.\n");
+       fprintf(stderr, "  -c, --kconsumerd-cmd-sock PATH     "
+                       "Specify path for the command socket\n");
+       fprintf(stderr, "  -e, --kconsumerd-err-sock PATH     "
+                       "Specify path for the error socket\n");
+       fprintf(stderr, "  -d, --daemonize                    "
+                       "Start as a daemon.\n");
+       fprintf(stderr, "  -q, --quiet                        "
+                       "No output at all.\n");
+       fprintf(stderr, "  -v, --verbose                      "
+                       "Verbose mode. Activate DBG() macro.\n");
+       fprintf(stderr, "  -V, --version                      "
+                       "Show version number.\n");
+       fprintf(stderr, "  -k, --kernel                       "
+                       "Consumer kernel buffers (default).\n");
+       fprintf(stderr, "  -u, --ust                          "
+                       "Consumer UST buffers.%s\n",
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+                       ""
+#else
+                       " (support not compiled in)"
+#endif
+                       );
+}
+
+/*
+ * daemon argument parsing
+ */
+static void parse_args(int argc, char **argv)
+{
+       int c;
+
+       static struct option long_options[] = {
+               { "kconsumerd-cmd-sock", 1, 0, 'c' },
+               { "kconsumerd-err-sock", 1, 0, 'e' },
+               { "daemonize", 0, 0, 'd' },
+               { "help", 0, 0, 'h' },
+               { "quiet", 0, 0, 'q' },
+               { "verbose", 0, 0, 'v' },
+               { "version", 0, 0, 'V' },
+               { "kernel", 0, 0, 'k' },
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+               { "ust", 0, 0, 'u' },
+#endif
+               { NULL, 0, 0, 0 }
+       };
+
+       while (1) {
+               int option_index = 0;
+               c = getopt_long(argc, argv, "dhqvVku" "c:e:", long_options, &option_index);
+               if (c == -1) {
+                       break;
+               }
+
+               switch (c) {
+               case 0:
+                       fprintf(stderr, "option %s", long_options[option_index].name);
+                       if (optarg) {
+                               fprintf(stderr, " with arg %s\n", optarg);
+                       }
+                       break;
+               case 'c':
+                       snprintf(command_sock_path, PATH_MAX, "%s", optarg);
+                       break;
+               case 'e':
+                       snprintf(error_sock_path, PATH_MAX, "%s", optarg);
+                       break;
+               case 'd':
+                       opt_daemon = 1;
+                       break;
+               case 'h':
+                       usage();
+                       exit(EXIT_FAILURE);
+               case 'q':
+                       opt_quiet = 1;
+                       break;
+               case 'v':
+                       opt_verbose = 1;
+                       break;
+               case 'V':
+                       fprintf(stdout, "%s\n", VERSION);
+                       exit(EXIT_SUCCESS);
+               case 'k':
+                       opt_type = LTTNG_CONSUMER_KERNEL;
+                       break;
+#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST
+               case 'u':
+                       opt_type = LTTNG_CONSUMER_UST;
+                       break;
+#endif
+               default:
+                       usage();
+                       exit(EXIT_FAILURE);
+               }
+       }
+}
+
+/*
+ * Consume data on a file descriptor and write it on a trace file.
+ */
+static int read_subbuffer(struct lttng_consumer_stream *stream)
+{
+       unsigned long len;
+       int err;
+       long ret = 0;
+       int infd = stream->wait_fd;
+
+       DBG("In read_subbuffer (infd : %d)", infd);
+       /* Get the next subbuffer */
+       err = kernctl_get_next_subbuf(infd);
+       if (err != 0) {
+               ret = errno;
+               /*
+                * This is a debug message even for single-threaded consumer,
+                * because poll() have more relaxed criterions than get subbuf,
+                * so get_subbuf may fail for short race windows where poll()
+                * would issue wakeups.
+                */
+               DBG("Reserving sub buffer failed (everything is normal, "
+                               "it is due to concurrency)");
+               goto end;
+       }
+
+       switch (stream->output) {
+               case LTTNG_EVENT_SPLICE:
+                       /* read the whole subbuffer */
+                       err = kernctl_get_padded_subbuf_size(infd, &len);
+                       if (err != 0) {
+                               ret = errno;
+                               perror("Getting sub-buffer len failed.");
+                               goto end;
+                       }
+
+                       /* splice the subbuffer to the tracefile */
+                       ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
+                       if (ret < 0) {
+                               /*
+                                * display the error but continue processing to try
+                                * to release the subbuffer
+                                */
+                               ERR("Error splicing to tracefile");
+                       }
+                       break;
+               case LTTNG_EVENT_MMAP:
+                       /* read the used subbuffer size */
+                       err = kernctl_get_padded_subbuf_size(infd, &len);
+                       if (err != 0) {
+                               ret = errno;
+                               perror("Getting sub-buffer len failed.");
+                               goto end;
+                       }
+                       /* write the subbuffer to the tracefile */
+                       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+                       if (ret < 0) {
+                               /*
+                                * display the error but continue processing to try
+                                * to release the subbuffer
+                                */
+                               ERR("Error writing to tracefile");
+                       }
+                       break;
+               default:
+                       ERR("Unknown output method");
+                       ret = -1;
+       }
+
+       err = kernctl_put_next_subbuf(infd);
+       if (err != 0) {
+               ret = errno;
+               if (errno == EFAULT) {
+                       perror("Error in unreserving sub buffer\n");
+               } else if (errno == EIO) {
+                       /* Should never happen with newer LTTng versions */
+                       perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+               }
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
+static int on_recv_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       /* Opening the tracefile in write mode */
+       if (stream->path_name != NULL) {
+               ret = open(stream->path_name,
+                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+               if (ret < 0) {
+                       ERR("Opening %s", stream->path_name);
+                       perror("open");
+                       goto error;
+               }
+               stream->out_fd = ret;
+       }
+
+       if (stream->output == LTTNG_EVENT_MMAP) {
+               /* get the len of the mmap region */
+               unsigned long mmap_len;
+
+               ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
+               if (ret != 0) {
+                       ret = errno;
+                       perror("kernctl_get_mmap_len");
+                       goto error_close_fd;
+               }
+               stream->mmap_len = (size_t) mmap_len;
+
+               stream->mmap_base = mmap(NULL, stream->mmap_len,
+                               PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
+               if (stream->mmap_base == MAP_FAILED) {
+                       perror("Error mmaping");
+                       ret = -1;
+                       goto error_close_fd;
+               }
+       }
+
+       /* we return 0 to let the library handle the FD internally */
+       return 0;
+
+error_close_fd:
+       {
+               int err;
+
+               err = close(stream->out_fd);
+               assert(!err);
+       }
+error:
+       return ret;
+}
+
+/*
+ * main
+ */
+int main(int argc, char **argv)
+{
+       int i;
+       int ret = 0;
+       void *status;
+
+       /* Parse arguments */
+       progname = argv[0];
+       parse_args(argc, argv);
+
+       /* Daemonize */
+       if (opt_daemon) {
+               ret = daemon(0, 0);
+               if (ret < 0) {
+                       perror("daemon");
+                       goto error;
+               }
+       }
+
+       if (strlen(command_sock_path) == 0) {
+               snprintf(command_sock_path, PATH_MAX,
+                       opt_type == LTTNG_CONSUMER_KERNEL ?
+                               KCONSUMERD_CMD_SOCK_PATH :
+                               USTCONSUMERD_CMD_SOCK_PATH);
+       }
+       /* create the consumer instance with and assign the callbacks */
+       ctx = lttng_consumer_create(opt_type, read_subbuffer, NULL, on_recv_stream, NULL);
+       if (ctx == NULL) {
+               goto error;
+       }
+
+       lttng_consumer_set_command_sock_path(ctx, command_sock_path);
+       if (strlen(error_sock_path) == 0) {
+               snprintf(error_sock_path, PATH_MAX,
+                       opt_type == LTTNG_CONSUMER_KERNEL ?
+                               KCONSUMERD_ERR_SOCK_PATH :
+                               USTCONSUMERD_ERR_SOCK_PATH);
+       }
+
+       if (set_signal_handler() < 0) {
+               goto error;
+       }
+
+       /* Connect to the socket created by ltt-sessiond to report errors */
+       DBG("Connecting to error socket %s", error_sock_path);
+       ret = lttcomm_connect_unix_sock(error_sock_path);
+       /* not a fatal error, but all communication with ltt-sessiond will fail */
+       if (ret < 0) {
+               WARN("Cannot connect to error socket, is ltt-sessiond started ?");
+       }
+       lttng_consumer_set_error_sock(ctx, ret);
+
+       /* Create the thread to manage the receive of fd */
+       ret = pthread_create(&threads[0], NULL, lttng_consumer_thread_receive_fds,
+                       (void *) ctx);
+       if (ret != 0) {
+               perror("pthread_create");
+               goto error;
+       }
+
+       /* Create thread to manage the polling/writing of traces */
+       ret = pthread_create(&threads[1], NULL, lttng_consumer_thread_poll_fds,
+                       (void *) ctx);
+       if (ret != 0) {
+               perror("pthread_create");
+               goto error;
+       }
+
+       for (i = 0; i < 2; i++) {
+               ret = pthread_join(threads[i], &status);
+               if (ret != 0) {
+                       perror("pthread_join");
+                       goto error;
+               }
+       }
+       ret = EXIT_SUCCESS;
+       lttng_consumer_send_error(ctx, CONSUMERD_EXIT_SUCCESS);
+       goto end;
+
+error:
+       ret = EXIT_FAILURE;
+       lttng_consumer_send_error(ctx, CONSUMERD_EXIT_FAILURE);
+
+end:
+       lttng_consumer_destroy(ctx);
+       lttng_consumer_cleanup();
+
+       return ret;
+}
index 7b8852308a047e96dad3a3de9c5b30251c1d75f6..4d8d75b124a98a01f7a0d759c882b305f02c622d 100644 (file)
@@ -24,6 +24,7 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <config.h>
 
 #include "../cmd.h"
 
index 4d81d2a1d4d9fe7914cd84bec9132207fed65f0c..b9c6b5a8e2d2a2163485407105e20ef119ccec16 100644 (file)
@@ -23,6 +23,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <config.h>
 
 #include <lttng/lttng.h>
 
index e46d9018696417000f325ddf8a0d8111ce48a79b..548714e94ffc8bb8fa79f6b45a0120b9905b4ba5 100644 (file)
@@ -69,7 +69,7 @@ static void create_one_kernel_session(void)
        printf("Validating kernel session: ");
        assert(kern->fd == 0);
        assert(kern->metadata_stream_fd == 0);
-       assert(kern->kconsumer_fds_sent == 0);
+       assert(kern->consumer_fds_sent == 0);
        assert(kern->channel_count == 0);
        assert(kern->stream_count_global == 0);
        assert(kern->metadata == NULL);
This page took 0.137741 seconds and 5 git commands to generate.