From: Jérémie Galarneau Date: Fri, 5 May 2017 03:52:24 +0000 (-0400) Subject: Implement consumer ring buffer position sampling X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=commitdiff_plain;h=e9404c27e7cc9d841785e6c4292c1add19fbc1cc Implement consumer ring buffer position sampling Signed-off-by: Jérémie Galarneau --- diff --git a/configure.ac b/configure.ac index 9bc394123..191dd838c 100644 --- a/configure.ac +++ b/configure.ac @@ -245,6 +245,7 @@ m4_define([_DEFAULT_CHANNEL_SUBBUF_NUM], [4]) m4_define([_DEFAULT_CHANNEL_SWITCH_TIMER], [0]) m4_define([_DEFAULT_CHANNEL_LIVE_TIMER], [0]) m4_define([_DEFAULT_CHANNEL_READ_TIMER], [200000]) +m4_define([_DEFAULT_CHANNEL_MONITOR_TIMER], [1000000]) _AC_DEFINE_AND_SUBST([DEFAULT_AGENT_TCP_PORT], [5345]) _AC_DEFINE_AND_SUBST([DEFAULT_APP_SOCKET_RW_TIMEOUT], [5]) _AC_DEFINE_AND_SUBST([DEFAULT_CHANNEL_SUBBUF_SIZE], [_DEFAULT_CHANNEL_SUBBUF_SIZE]) @@ -255,6 +256,7 @@ _AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_READ_TIMER], [_DEFAULT_CHANNEL_READ _AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_SUBBUF_NUM], [_DEFAULT_CHANNEL_SUBBUF_NUM]) _AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_SUBBUF_SIZE], [262144]) _AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER], [_DEFAULT_CHANNEL_SWITCH_TIMER]) +_AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER], [_DEFAULT_CHANNEL_MONITOR_TIMER]) _AC_DEFINE_AND_SUBST([DEFAULT_LTTNG_LIVE_TIMER], [1000000]) _AC_DEFINE_AND_SUBST([DEFAULT_METADATA_CACHE_SIZE], [4096]) _AC_DEFINE_AND_SUBST([DEFAULT_METADATA_READ_TIMER], [0]) @@ -269,11 +271,13 @@ _AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_READ_TIMER], [0]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_SUBBUF_NUM], [_DEFAULT_CHANNEL_SUBBUF_NUM]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_SUBBUF_SIZE], [_DEFAULT_CHANNEL_SUBBUF_SIZE]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_SWITCH_TIMER], [_DEFAULT_CHANNEL_SWITCH_TIMER]) +_AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER], [_DEFAULT_CHANNEL_MONITOR_TIMER]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_LIVE_TIMER], [_DEFAULT_CHANNEL_LIVE_TIMER]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_READ_TIMER], [0]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_SUBBUF_NUM], [_DEFAULT_CHANNEL_SUBBUF_NUM]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_SUBBUF_SIZE], [131072]) _AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_SWITCH_TIMER], [_DEFAULT_CHANNEL_SWITCH_TIMER]) +_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER], [_DEFAULT_CHANNEL_MONITOR_TIMER]) _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_AGENT_BIND_ADDRESS], [localhost]) _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_CONTROL_BIND_ADDRESS], [0.0.0.0]) _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0]) diff --git a/include/lttng/channel-internal.h b/include/lttng/channel-internal.h index 54e75c066..d78a720d4 100644 --- a/include/lttng/channel-internal.h +++ b/include/lttng/channel-internal.h @@ -21,6 +21,7 @@ struct lttng_channel_extended { uint64_t discarded_events; uint64_t lost_packets; + uint64_t monitor_timer_interval; } LTTNG_PACKED; #endif /* LTTNG_CHANNEL_INTERNAL_H */ diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index a32548fa0..9fb474753 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -583,6 +583,14 @@ exit_metadata_timer_thread: PERROR("pthread_join sessiond_thread"); retval = -1; } + + ret = consumer_timer_thread_get_channel_monitor_pipe(); + if (ret >= 0) { + ret = close(ret); + if (ret) { + PERROR("close channel monitor pipe"); + } + } exit_sessiond_thread: ret = pthread_join(data_thread, &status); diff --git a/src/bin/lttng-sessiond/channel.c b/src/bin/lttng-sessiond/channel.c index 88480c4b4..8a88ecced 100644 --- a/src/bin/lttng-sessiond/channel.c +++ b/src/bin/lttng-sessiond/channel.c @@ -41,6 +41,7 @@ struct lttng_channel *channel_new_default_attr(int dom, { struct lttng_channel *chan; const char *channel_name = DEFAULT_CHANNEL_NAME; + struct lttng_channel_extended *extended_attr = NULL; chan = zmalloc(sizeof(struct lttng_channel)); if (chan == NULL) { @@ -48,6 +49,14 @@ struct lttng_channel *channel_new_default_attr(int dom, goto error_alloc; } + extended_attr = zmalloc(sizeof(struct lttng_channel_extended)); + if (!extended_attr) { + PERROR("zmalloc channel extended init"); + goto error; + } + + chan->attr.extended.ptr = extended_attr; + /* Same for all domains. */ chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE; chan->attr.tracefile_size = DEFAULT_CHANNEL_TRACEFILE_SIZE; @@ -63,6 +72,8 @@ struct lttng_channel *channel_new_default_attr(int dom, chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER; chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER; chan->attr.live_timer_interval = DEFAULT_KERNEL_CHANNEL_LIVE_TIMER; + extended_attr->monitor_timer_interval = + DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER; break; case LTTNG_DOMAIN_JUL: channel_name = DEFAULT_JUL_CHANNEL_NAME; @@ -86,6 +97,8 @@ common_ust: DEFAULT_UST_UID_CHANNEL_READ_TIMER; chan->attr.live_timer_interval = DEFAULT_UST_UID_CHANNEL_LIVE_TIMER; + extended_attr->monitor_timer_interval = + DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER; break; case LTTNG_BUFFER_PER_PID: default: @@ -97,7 +110,9 @@ common_ust: chan->attr.read_timer_interval = DEFAULT_UST_PID_CHANNEL_READ_TIMER; chan->attr.live_timer_interval = - DEFAULT_UST_UID_CHANNEL_LIVE_TIMER; + DEFAULT_UST_PID_CHANNEL_LIVE_TIMER; + extended_attr->monitor_timer_interval = + DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER; break; } break; @@ -113,11 +128,21 @@ common_ust: return chan; error: + free(extended_attr); free(chan); error_alloc: return NULL; } +void channel_attr_destroy(struct lttng_channel *channel) +{ + if (!channel) { + return; + } + free(channel->attr.extended.ptr); + free(channel); +} + /* * Disable kernel channel of the kernel session. */ @@ -249,7 +274,7 @@ int channel_kernel_create(struct ltt_kernel_session *ksession, ret = LTTNG_OK; error: - free(defattr); + channel_attr_destroy(defattr); return ret; } @@ -459,7 +484,7 @@ int channel_ust_create(struct ltt_ust_session *usess, } } - free(defattr); + channel_attr_destroy(defattr); return LTTNG_OK; error_free_chan: @@ -469,7 +494,7 @@ error_free_chan: */ trace_ust_destroy_channel(uchan); error: - free(defattr); + channel_attr_destroy(defattr); return ret; } diff --git a/src/bin/lttng-sessiond/channel.h b/src/bin/lttng-sessiond/channel.h index 15cabea74..9b736fc66 100644 --- a/src/bin/lttng-sessiond/channel.h +++ b/src/bin/lttng-sessiond/channel.h @@ -32,6 +32,7 @@ int channel_kernel_create(struct ltt_kernel_session *ksession, struct lttng_channel *channel_new_default_attr(int domain, enum lttng_buffer_type type); +void channel_attr_destroy(struct lttng_channel *channel); int channel_ust_create(struct ltt_ust_session *usess, struct lttng_channel *attr, enum lttng_buffer_type type); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 6ee397579..4a7287b61 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -748,7 +748,6 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) } ret = consumer_recv_status_reply(sock); - error: return ret; } @@ -806,6 +805,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, @@ -837,6 +837,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -892,7 +893,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_size, uint64_t tracefile_count, unsigned int monitor, - unsigned int live_timer_interval) + unsigned int live_timer_interval, + unsigned int monitor_timer_interval) { assert(msg); @@ -913,6 +915,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.tracefile_count = tracefile_count; msg->u.channel.monitor = monitor; msg->u.channel.live_timer_interval = live_timer_interval; + msg->u.channel.monitor_timer_interval = monitor_timer_interval; strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); @@ -1048,6 +1051,35 @@ error: return ret; } +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE; + + DBG3("Sending set_channel_monitor_pipe command to consumer"); + ret = consumer_send_msg(consumer_sock, &msg); + if (ret < 0) { + goto error; + } + + DBG3("Sending channel monitoring pipe %d to consumer on socket %d", + pipe, *consumer_sock->fd_ptr); + ret = consumer_send_fds(consumer_sock, &pipe, 1); + if (ret < 0) { + goto error; + } + + DBG2("Channel monitoring pipe successfully sent"); +error: + return ret; +} + /* * Set consumer subdirectory using the session name and a generated datetime if * needed. This is appended to the current subdirectory. diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 08b57eb73..77bc2b1f1 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -93,6 +93,11 @@ struct consumer_data { int err_sock; /* These two sockets uses the cmd_unix_sock_path. */ int cmd_sock; + /* + * Write-end of the channel monitoring pipe to be passed to the + * consumer. + */ + int channel_monitor_pipe; /* * The metadata socket object is handled differently and only created * locally in this object thus it's the only reference available in the @@ -214,6 +219,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id, char *session_name, char *hostname, int session_live_timer); +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); int consumer_recv_status_reply(struct consumer_socket *sock); @@ -232,6 +239,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, @@ -273,7 +281,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_size, uint64_t tracefile_count, unsigned int monitor, - unsigned int live_timer_interval); + unsigned int live_timer_interval, + unsigned int monitor_timer_interval); int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer); int consumer_close_metadata(struct consumer_socket *socket, diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 2241acbca..a65e1493c 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -28,6 +28,9 @@ #include "consumer.h" #include "health-sessiond.h" #include "kernel-consumer.h" +#include "notification-thread-commands.h" +#include "session.h" +#include "lttng-sessiond.h" static char *create_channel_path(struct consumer_output *consumer, uid_t uid, gid_t gid) @@ -87,26 +90,33 @@ error: * Sending a single channel to the consumer with command ADD_CHANNEL. */ int kernel_consumer_add_channel(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_session *session, + struct ltt_kernel_channel *channel, + struct ltt_kernel_session *ksession, unsigned int monitor) { int ret; char *pathname; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; + enum lttng_error_code status; + struct ltt_session *session; + struct lttng_channel_extended *channel_attr_extended; /* Safety net */ assert(channel); - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); - consumer = session->consumer; + consumer = ksession->consumer; + channel_attr_extended = (struct lttng_channel_extended *) + channel->channel->attr.extended.ptr; DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name); if (monitor) { - pathname = create_channel_path(consumer, session->uid, session->gid); + pathname = create_channel_path(consumer, ksession->uid, + ksession->gid); } else { /* Empty path. */ pathname = strdup(""); @@ -120,10 +130,10 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, consumer_init_channel_comm_msg(&lkm, LTTNG_CONSUMER_ADD_CHANNEL, channel->fd, - session->id, + ksession->id, pathname, - session->uid, - session->gid, + ksession->uid, + ksession->gid, consumer->net_seq_index, channel->channel->name, channel->stream_count, @@ -132,7 +142,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, channel->channel->attr.tracefile_size, channel->channel->attr.tracefile_count, monitor, - channel->channel->attr.live_timer_interval); + channel->channel->attr.live_timer_interval, + channel_attr_extended->monitor_timer_interval); health_code_update(); @@ -142,7 +153,21 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, } health_code_update(); + rcu_read_lock(); + session = session_find_by_id(ksession->id); + assert(session); + status = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ksession->uid, ksession->gid, + channel->channel->name, channel->fd, + LTTNG_DOMAIN_KERNEL, + channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf); + rcu_read_unlock(); + if (status != LTTNG_OK) { + ret = -1; + goto error; + } error: free(pathname); return ret; @@ -194,7 +219,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, DEFAULT_KERNEL_CHANNEL_OUTPUT, CONSUMER_CHANNEL_TYPE_METADATA, 0, 0, - monitor, 0); + monitor, 0, 0); health_code_update(); diff --git a/src/bin/lttng-sessiond/lttng-sessiond.h b/src/bin/lttng-sessiond/lttng-sessiond.h index d7ee414f3..74552db6e 100644 --- a/src/bin/lttng-sessiond/lttng-sessiond.h +++ b/src/bin/lttng-sessiond/lttng-sessiond.h @@ -29,6 +29,7 @@ #include "session.h" #include "ust-app.h" #include "version.h" +#include "notification-thread.h" extern const char default_home_dir[], default_tracing_group[], @@ -38,6 +39,8 @@ extern const char default_home_dir[], /* Set in main.c at boot time of the daemon */ extern int kernel_tracer_fd; +extern struct notification_thread_handle *notification_thread_handle; + /* * This contains extra data needed for processing a command received by the * session daemon from the lttng client. @@ -121,6 +124,8 @@ extern const char * const config_section_name; /* Is this daemon root or not. */ extern int is_root; +extern const char *tracing_group_name; + int sessiond_check_thread_quit_pipe(int fd, uint32_t events); int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size); void sessiond_notify_ready(void); diff --git a/src/bin/lttng-sessiond/trace-kernel.c b/src/bin/lttng-sessiond/trace-kernel.c index 29432de49..d6ee8e8af 100644 --- a/src/bin/lttng-sessiond/trace-kernel.c +++ b/src/bin/lttng-sessiond/trace-kernel.c @@ -26,6 +26,8 @@ #include "consumer.h" #include "trace-kernel.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" /* * Find the channel name for the given kernel session. @@ -179,6 +181,7 @@ struct ltt_kernel_channel *trace_kernel_create_channel( struct lttng_channel *chan) { struct ltt_kernel_channel *lkc; + struct lttng_channel_extended *extended; assert(chan); @@ -191,10 +194,18 @@ struct ltt_kernel_channel *trace_kernel_create_channel( lkc->channel = zmalloc(sizeof(struct lttng_channel)); if (lkc->channel == NULL) { PERROR("lttng_channel zmalloc"); - free(lkc); + goto error; + } + + extended = zmalloc(sizeof(struct lttng_channel_extended)); + if (!extended) { + PERROR("lttng_channel_channel zmalloc"); goto error; } memcpy(lkc->channel, chan, sizeof(struct lttng_channel)); + memcpy(extended, chan->attr.extended.ptr, sizeof(struct lttng_channel_extended)); + lkc->channel->attr.extended.ptr = extended; + extended = NULL; /* * If we receive an empty string for channel name, it means the @@ -218,6 +229,11 @@ struct ltt_kernel_channel *trace_kernel_create_channel( return lkc; error: + if (lkc) { + free(lkc->channel); + } + free(extended); + free(lkc); return NULL; } @@ -475,6 +491,7 @@ void trace_kernel_destroy_channel(struct ltt_kernel_channel *channel) struct ltt_kernel_event *event, *etmp; struct ltt_kernel_context *ctx, *ctmp; int ret; + enum lttng_error_code status; assert(channel); @@ -505,6 +522,11 @@ void trace_kernel_destroy_channel(struct ltt_kernel_channel *channel) /* Remove from channel list */ cds_list_del(&channel->list); + status = notification_thread_command_remove_channel( + notification_thread_handle, + channel->fd, LTTNG_DOMAIN_KERNEL); + assert(status == LTTNG_OK); + free(channel->channel->attr.extended.ptr); free(channel->channel); free(channel); } diff --git a/src/bin/lttng-sessiond/trace-ust.c b/src/bin/lttng-sessiond/trace-ust.c index 1c325fb25..55ca4fb3c 100644 --- a/src/bin/lttng-sessiond/trace-ust.c +++ b/src/bin/lttng-sessiond/trace-ust.c @@ -356,6 +356,8 @@ struct ltt_ust_channel *trace_ust_create_channel(struct lttng_channel *chan, luc->attr.switch_timer_interval = chan->attr.switch_timer_interval; luc->attr.read_timer_interval = chan->attr.read_timer_interval; luc->attr.output = (enum lttng_ust_output) chan->attr.output; + luc->monitor_timer_interval = + ((struct lttng_channel_extended *) chan->attr.extended.ptr)->monitor_timer_interval; /* Translate to UST output enum */ switch (luc->attr.output) { diff --git a/src/bin/lttng-sessiond/trace-ust.h b/src/bin/lttng-sessiond/trace-ust.h index fc9eef4b7..33b0a2031 100644 --- a/src/bin/lttng-sessiond/trace-ust.h +++ b/src/bin/lttng-sessiond/trace-ust.h @@ -82,6 +82,7 @@ struct ltt_ust_channel { uint64_t tracefile_count; uint64_t per_pid_closed_app_discarded; uint64_t per_pid_closed_app_lost; + uint64_t monitor_timer_interval; }; /* UST domain global (LTTNG_DOMAIN_UST) */ diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 5a41c3800..d292779d1 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -41,6 +41,8 @@ #include "ust-ctl.h" #include "utils.h" #include "session.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" static int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess); @@ -482,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, /* Wipe and free registry from session registry. */ registry = get_session_registry(ua_chan->session); if (registry) { - ust_registry_channel_del_free(registry, ua_chan->key); + ust_registry_channel_del_free(registry, ua_chan->key, + true); } save_per_pid_lost_discarded_counters(ua_chan); } @@ -1793,6 +1796,7 @@ static void shadow_copy_channel(struct ust_app_channel *ua_chan, ua_chan->attr.overwrite = uchan->attr.overwrite; ua_chan->attr.switch_timer_interval = uchan->attr.switch_timer_interval; ua_chan->attr.read_timer_interval = uchan->attr.read_timer_interval; + ua_chan->monitor_timer_interval = uchan->monitor_timer_interval; ua_chan->attr.output = uchan->attr.output; /* * Note that the attribute channel type is not set since the channel on the @@ -2839,6 +2843,7 @@ static int create_channel_per_uid(struct ust_app *app, int ret; struct buffer_reg_uid *reg_uid; struct buffer_reg_channel *reg_chan; + bool created = false; assert(app); assert(usess); @@ -2882,7 +2887,7 @@ static int create_channel_per_uid(struct ust_app *app, * it's not visible anymore in the session registry. */ ust_registry_channel_del_free(reg_uid->registry->reg.ust, - ua_chan->tracing_channel_id); + ua_chan->tracing_channel_id, false); buffer_reg_channel_remove(reg_uid->registry, reg_chan); buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST); goto error; @@ -2898,7 +2903,7 @@ static int create_channel_per_uid(struct ust_app *app, ua_chan->name); goto error; } - + created = true; } /* Send buffers to the application. */ @@ -2910,6 +2915,41 @@ static int create_channel_per_uid(struct ust_app *app, goto error; } + if (created) { + enum lttng_error_code cmd_ret; + struct ltt_session *session; + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; + + rcu_read_lock(); + chan_reg_key = ua_chan->tracing_channel_id; + + pthread_mutex_lock(®_uid->registry->reg.ust->lock); + chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, + chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + chan_reg = NULL; + pthread_mutex_unlock(®_uid->registry->reg.ust->lock); + + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + rcu_read_unlock(); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + } + error: return ret; } @@ -2925,6 +2965,10 @@ static int create_channel_per_pid(struct ust_app *app, { int ret; struct ust_registry_session *registry; + enum lttng_error_code cmd_ret; + struct ltt_session *session; + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; assert(app); assert(usess); @@ -2963,6 +3007,29 @@ static int create_channel_per_pid(struct ust_app *app, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + chan_reg_key = ua_chan->key; + pthread_mutex_lock(®istry->lock); + chan_reg = ust_registry_channel_find(registry, chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + pthread_mutex_unlock(®istry->lock); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + error: rcu_read_unlock(); return ret; @@ -3075,7 +3142,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess, /* Only add the channel if successful on the tracer side. */ lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node); - end: if (ua_chanp) { *ua_chanp = ua_chan; diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h index 48f9fd381..03a50e877 100644 --- a/src/bin/lttng-sessiond/ust-app.h +++ b/src/bin/lttng-sessiond/ust-app.h @@ -157,6 +157,7 @@ struct ust_app_channel { struct lttng_ht *events; uint64_t tracefile_size; uint64_t tracefile_count; + uint64_t monitor_timer_interval; /* * Node indexed by channel name in the channels' hash table of a session. */ diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 3bb54f039..fe2c8f4c8 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -32,6 +32,7 @@ #include "ust-consumer.h" #include "buffer-registry.h" #include "session.h" +#include "lttng-sessiond.h" /* * Return allocated full pathname of the session using the consumer trace path @@ -94,7 +95,7 @@ error: } /* - * Send a single channel to the consumer using command ADD_CHANNEL. + * Send a single channel to the consumer using command ASK_CHANNEL_CREATION. * * Consumer socket lock MUST be acquired before calling this. */ @@ -174,6 +175,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_chan->attr.switch_timer_interval, ua_chan->attr.read_timer_interval, ua_sess->live_timer_interval, + ua_chan->monitor_timer_interval, output, (int) ua_chan->attr.type, ua_sess->tracing_id, @@ -223,6 +225,8 @@ error: /* * Ask consumer to create a channel for a given session. * + * Session list and rcu read side locks must be held by the caller. + * * Returns 0 on success else a negative value. */ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, @@ -230,6 +234,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, struct consumer_socket *socket, struct ust_registry_session *registry) { int ret; + struct ltt_session *session; assert(ua_sess); assert(ua_chan); @@ -243,10 +248,14 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + pthread_mutex_lock(socket->lock); ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry); pthread_mutex_unlock(socket->lock); if (ret < 0) { + ERR("ask_channel_creation consumer command failed"); goto error; } diff --git a/src/bin/lttng-sessiond/ust-registry.c b/src/bin/lttng-sessiond/ust-registry.c index 3c3aa91f0..88d663170 100644 --- a/src/bin/lttng-sessiond/ust-registry.c +++ b/src/bin/lttng-sessiond/ust-registry.c @@ -26,6 +26,8 @@ #include "ust-registry.h" #include "ust-app.h" #include "utils.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" /* * Hash table match function for event in the registry. @@ -695,13 +697,23 @@ void destroy_channel_rcu(struct rcu_head *head) * free the registry pointer since it might not have been allocated before so * it's the caller responsability. */ -static void destroy_channel(struct ust_registry_channel *chan) +static void destroy_channel(struct ust_registry_channel *chan, bool notif) { struct lttng_ht_iter iter; struct ust_registry_event *event; + enum lttng_error_code cmd_ret; assert(chan); + if (notif) { + cmd_ret = notification_thread_command_remove_channel( + notification_thread_handle, chan->consumer_key, + LTTNG_DOMAIN_UST); + if (cmd_ret != LTTNG_OK) { + ERR("Failed to remove channel from notification thread"); + } + } + rcu_read_lock(); /* Destroy all event associated with this registry. */ cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) { @@ -759,7 +771,7 @@ int ust_registry_channel_add(struct ust_registry_session *session, return 0; error: - destroy_channel(chan); + destroy_channel(chan, false); error_alloc: return ret; } @@ -798,7 +810,7 @@ end: * Remove channel using key from registry and free memory. */ void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key) + uint64_t key, bool notif) { struct lttng_ht_iter iter; struct ust_registry_channel *chan; @@ -817,7 +829,7 @@ void ust_registry_channel_del_free(struct ust_registry_session *session, ret = lttng_ht_del(session->channels, &iter); assert(!ret); rcu_read_unlock(); - destroy_channel(chan); + destroy_channel(chan, notif); end: return; @@ -972,7 +984,7 @@ void ust_registry_session_destroy(struct ust_registry_session *reg) /* Delete the node from the ht and free it. */ ret = lttng_ht_del(reg->channels, &iter); assert(!ret); - destroy_channel(chan); + destroy_channel(chan, true); } rcu_read_unlock(); ht_cleanup_push(reg->channels); diff --git a/src/bin/lttng-sessiond/ust-registry.h b/src/bin/lttng-sessiond/ust-registry.h index 2697cf2ec..414975caf 100644 --- a/src/bin/lttng-sessiond/ust-registry.h +++ b/src/bin/lttng-sessiond/ust-registry.h @@ -109,6 +109,7 @@ struct ust_registry_session { struct ust_registry_channel { uint64_t key; + uint64_t consumer_key; /* Id set when replying to a register channel. */ uint32_t chan_id; enum ustctl_channel_header header_type; @@ -248,7 +249,7 @@ struct ust_registry_channel *ust_registry_channel_find( int ust_registry_channel_add(struct ust_registry_session *session, uint64_t key); void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key); + uint64_t key, bool notif); int ust_registry_session_init(struct ust_registry_session **sessionp, struct ust_app *app, @@ -313,7 +314,7 @@ int ust_registry_channel_add(struct ust_registry_session *session, } static inline void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key) + uint64_t key, bool notif) {} static inline int ust_registry_session_init(struct ust_registry_session **sessionp, diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index b0a434284..09cf36293 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -31,6 +32,12 @@ #include #include +typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream); +typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, + unsigned long *consumed); +typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, + unsigned long *produced); + static struct timer_signal_data timer_signal = { .tid = 0, .setup_done = 0, @@ -61,8 +68,14 @@ static void setmask(sigset_t *mask) if (ret) { PERROR("sigaddset live"); } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR); + if (ret) { + PERROR("sigaddset monitor"); + } } +static int channel_monitor_pipe = -1; + /* * Execute action on a timer switch. * @@ -71,7 +84,7 @@ static void setmask(sigset_t *mask) * deadlocks. */ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -304,7 +317,7 @@ end: * Execute action on a live timer */ static void live_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -411,44 +424,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr) } /* - * Set the timer for periodical metadata flush. + * Start a timer channel timer which will fire at a given interval + * (timer_interval_us)and fire a given signal (signal). + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval) +static +int consumer_channel_timer_start(timer_t *timer_id, + struct lttng_consumer_channel *channel, + unsigned int timer_interval_us, int signal) { - int ret; + int ret = 0, delete_ret; struct sigevent sev; struct itimerspec its; assert(channel); assert(channel->key); - if (switch_timer_interval == 0) { - return; + if (timer_interval_us == 0) { + /* No creation needed; not an error. */ + ret = 1; + goto end; } sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH; + sev.sigev_signo = signal; sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->switch_timer); + ret = timer_create(CLOCKID, &sev, timer_id); if (ret == -1) { PERROR("timer_create"); + goto end; } - channel->switch_timer_enabled = 1; - its.it_value.tv_sec = switch_timer_interval / 1000000; - its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000; + its.it_value.tv_sec = timer_interval_us / 1000000; + its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000; its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; - ret = timer_settime(channel->switch_timer, 0, &its, NULL); + ret = timer_settime(*timer_id, 0, &its, NULL); if (ret == -1) { PERROR("timer_settime"); + goto error_destroy_timer; + } +end: + return ret; +error_destroy_timer: + delete_ret = timer_delete(*timer_id); + if (delete_ret == -1) { + PERROR("timer_delete"); + } + goto end; +} + +static +int consumer_channel_timer_stop(timer_t *timer_id, int signal) +{ + int ret = 0; + + ret = timer_delete(*timer_id); + if (ret == -1) { + PERROR("timer_delete"); + goto end; } + + consumer_timer_signal_thread_qs(signal); + *timer_id = 0; +end: + return ret; +} + +/* + * Set the channel's switch timer. + */ +void consumer_timer_switch_start(struct lttng_consumer_channel *channel, + unsigned int switch_timer_interval_us) +{ + int ret; + + assert(channel); + assert(channel->key); + + ret = consumer_channel_timer_start(&channel->switch_timer, channel, + switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH); + + channel->switch_timer_enabled = !!(ret == 0); } /* - * Stop and delete timer. + * Stop and delete the channel's switch timer. */ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) { @@ -456,72 +520,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) assert(channel); - ret = timer_delete(channel->switch_timer); + ret = consumer_channel_timer_stop(&channel->switch_timer, + LTTNG_CONSUMER_SIG_SWITCH); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop switch timer"); } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH); - - channel->switch_timer = 0; channel->switch_timer_enabled = 0; } /* - * Set the timer for the live mode. + * Set the channel's live timer. */ void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval) + unsigned int live_timer_interval_us) { int ret; - struct sigevent sev; - struct itimerspec its; assert(channel); assert(channel->key); - if (live_timer_interval <= 0) { - return; - } + ret = consumer_channel_timer_start(&channel->live_timer, channel, + live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); - sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE; - sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->live_timer); - if (ret == -1) { - PERROR("timer_create"); - } - channel->live_timer_enabled = 1; + channel->live_timer_enabled = !!(ret == 0); +} - its.it_value.tv_sec = live_timer_interval / 1000000; - its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000; - its.it_interval.tv_sec = its.it_value.tv_sec; - its.it_interval.tv_nsec = its.it_value.tv_nsec; +/* + * Stop and delete the channel's live timer. + */ +void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); - ret = timer_settime(channel->live_timer, 0, &its, NULL); + ret = consumer_channel_timer_stop(&channel->live_timer, + LTTNG_CONSUMER_SIG_LIVE); if (ret == -1) { - PERROR("timer_settime"); + ERR("Failed to stop live timer"); } + + channel->live_timer_enabled = 0; } /* - * Stop and delete timer. + * Set the channel's monitoring timer. + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us) { int ret; assert(channel); + assert(channel->key); + assert(!channel->monitor_timer_enabled); - ret = timer_delete(channel->live_timer); + ret = consumer_channel_timer_start(&channel->monitor_timer, channel, + monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR); + channel->monitor_timer_enabled = !!(ret == 0); + return ret; +} + +/* + * Stop and delete the channel's monitoring timer. + */ +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); + assert(channel->monitor_timer_enabled); + + ret = consumer_channel_timer_stop(&channel->monitor_timer, + LTTNG_CONSUMER_SIG_MONITOR); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop live timer"); + goto end; } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE); - - channel->live_timer = 0; - channel->live_timer_enabled = 0; + channel->monitor_timer_enabled = 0; +end: + return ret; } /* @@ -544,9 +627,165 @@ int consumer_signal_init(void) return 0; } +static +int sample_channel_positions(struct lttng_consumer_channel *channel, + uint64_t *_highest_use, uint64_t *_lowest_use, + sample_positions_cb sample, get_consumed_cb get_consumed, + get_produced_cb get_produced) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + bool empty_channel = true; + uint64_t high = 0, low = UINT64_MAX; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + unsigned long produced, consumed, usage; + + empty_channel = false; + + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + + ret = sample(stream); + if (ret) { + ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = get_consumed(stream, &consumed); + if (ret) { + ERR("Failed to get buffer consumed position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = get_produced(stream, &produced); + if (ret) { + ERR("Failed to get buffer produced position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + + usage = produced - consumed; + high = (usage > high) ? usage : high; + low = (usage < low) ? usage : low; + next: + pthread_mutex_unlock(&stream->lock); + } + + *_highest_use = high; + *_lowest_use = low; +end: + rcu_read_unlock(); + if (empty_channel) { + ret = -1; + } + return ret; +} + +/* + * Execute action on a monitor timer. + */ +static +void monitor_timer(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *channel) +{ + int ret; + int channel_monitor_pipe = + consumer_timer_thread_get_channel_monitor_pipe(); + struct lttcomm_consumer_channel_monitor_msg msg = { + .key = channel->key, + }; + sample_positions_cb sample; + get_consumed_cb get_consumed; + get_produced_cb get_produced; + + assert(channel); + pthread_mutex_lock(&consumer_data.lock); + + if (channel_monitor_pipe < 0) { + goto end; + } + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + sample = lttng_kconsumer_sample_snapshot_positions; + get_consumed = lttng_kconsumer_get_consumed_snapshot; + get_produced = lttng_kconsumer_get_produced_snapshot; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + sample = lttng_ustconsumer_sample_snapshot_positions; + get_consumed = lttng_ustconsumer_get_consumed_snapshot; + get_produced = lttng_ustconsumer_get_produced_snapshot; + break; + default: + abort(); + } + + ret = sample_channel_positions(channel, &msg.highest, &msg.lowest, + sample, get_consumed, get_produced); + if (ret) { + goto end; + } + + /* + * Writes performed here are assumed to be atomic which is only + * guaranteed for sizes < than PIPE_BUF. + */ + assert(sizeof(msg) <= PIPE_BUF); + + do { + ret = write(channel_monitor_pipe, &msg, sizeof(msg)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + if (errno == EAGAIN) { + /* Not an error, the sample is merely dropped. */ + DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64, + channel->key); + } else { + PERROR("write to the channel monitor pipe"); + } + } else { + DBG("Sent channel monitoring sample for channel key %" PRIu64 + ", (highest = %" PRIu64 ", lowest = %"PRIu64")", + channel->key, msg.highest, msg.lowest); + } +end: + pthread_mutex_unlock(&consumer_data.lock); +} + +int consumer_timer_thread_get_channel_monitor_pipe(void) +{ + return uatomic_read(&channel_monitor_pipe); +} + +int consumer_timer_thread_set_channel_monitor_pipe(int fd) +{ + int ret; + + ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd); + if (ret != -1) { + ret = -1; + goto end; + } + ret = 0; +end: + return ret; +} + /* * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, - * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE. + * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and + * LTTNG_CONSUMER_SIG_MONITOR. */ void *consumer_timer_thread(void *data) { @@ -575,20 +814,31 @@ void *consumer_timer_thread(void *data) health_poll_entry(); signr = sigwaitinfo(&mask, &info); health_poll_exit(); + + /* + * NOTE: cascading conditions are used instead of a switch case + * since the use of SIGRTMIN in the definition of the signals' + * values prevents the reduction to an integer constant. + */ if (signr == -1) { if (errno != EINTR) { PERROR("sigwaitinfo"); } continue; } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) { - metadata_switch_timer(ctx, info.si_signo, &info, NULL); + metadata_switch_timer(ctx, info.si_signo, &info); } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) { cmm_smp_mb(); CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { - live_timer(ctx, info.si_signo, &info, NULL); + live_timer(ctx, info.si_signo, &info); + } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) { + struct lttng_consumer_channel *channel; + + channel = info.si_value.sival_ptr; + monitor_timer(ctx, channel); } else { ERR("Unexpected signal %d\n", info.si_signo); } diff --git a/src/common/consumer/consumer-timer.h b/src/common/consumer/consumer-timer.h index 22e74574c..851a172aa 100644 --- a/src/common/consumer/consumer-timer.h +++ b/src/common/consumer/consumer-timer.h @@ -27,6 +27,7 @@ #define LTTNG_CONSUMER_SIG_SWITCH SIGRTMIN + 10 #define LTTNG_CONSUMER_SIG_TEARDOWN SIGRTMIN + 11 #define LTTNG_CONSUMER_SIG_LIVE SIGRTMIN + 12 +#define LTTNG_CONSUMER_SIG_MONITOR SIGRTMIN + 13 #define CLOCKID CLOCK_MONOTONIC @@ -44,15 +45,21 @@ struct timer_signal_data { }; void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval); + unsigned int switch_timer_interval_us); void consumer_timer_switch_stop(struct lttng_consumer_channel *channel); void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval); + unsigned int live_timer_interval_us); void consumer_timer_live_stop(struct lttng_consumer_channel *channel); +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us); +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel); void *consumer_timer_thread(void *data); int consumer_signal_init(void); int consumer_flush_kernel_index(struct lttng_consumer_stream *stream); int consumer_flush_ust_index(struct lttng_consumer_stream *stream); +int consumer_timer_thread_get_channel_monitor_pipe(void); +int consumer_timer_thread_set_channel_monitor_pipe(int fd); + #endif /* CONSUMER_TIMER_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e379171ae..3b857dec3 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -368,6 +368,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1348,6 +1351,8 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } + ctx->channel_monitor_pipe = -1; + return ctx; error_metadata_pipe: diff --git a/src/common/defaults.h b/src/common/defaults.h index 818332f54..e0d0d86dc 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -218,6 +218,7 @@ /* See lttng-kernel.h enum lttng_kernel_output for channel output */ #define DEFAULT_KERNEL_CHANNEL_OUTPUT LTTNG_EVENT_SPLICE #define DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER +#define DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER #define DEFAULT_KERNEL_CHANNEL_READ_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_READ_TIMER #define DEFAULT_KERNEL_CHANNEL_LIVE_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_LIVE_TIMER @@ -237,6 +238,8 @@ #define DEFAULT_UST_UID_CHANNEL_SWITCH_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_SWITCH_TIMER #define DEFAULT_UST_PID_CHANNEL_LIVE_TIMER CONFIG_DEFAULT_UST_PID_CHANNEL_LIVE_TIMER #define DEFAULT_UST_UID_CHANNEL_LIVE_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_LIVE_TIMER +#define DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER CONFIG_DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER +#define DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER #define DEFAULT_UST_PID_CHANNEL_READ_TIMER CONFIG_DEFAULT_UST_PID_CHANNEL_READ_TIMER #define DEFAULT_UST_UID_CHANNEL_READ_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_READ_TIMER diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 1a786352d..3554b8f05 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -68,6 +68,19 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) return ret; } +/* + * Sample consumed and produced positions for a specific fd. + * + * Returns 0 on success, < 0 on error. + */ +int lttng_kconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream) +{ + assert(stream); + + return kernctl_snapshot_sample_positions(stream->wait_fd); +} + /* * Get the produced position * @@ -534,9 +547,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } - if (CONSUMER_CHANNEL_TYPE_DATA) { + if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) { + int monitor_start_ret; + + DBG("Consumer starting monitor timer"); consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + new_channel, + msg.u.channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_nosignal; + } + } health_code_update(); diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index a07f52188..fe6923678 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -1,6 +1,7 @@ /* * Copyright (C) 2011 - Julien Desfossez * Copyright (C) 2011 - Mathieu Desnoyers + * Copyright (C) 2017 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -22,6 +23,8 @@ #include int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream); +int lttng_kconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream); int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 0c2667031..f6179f31d 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include #include #include @@ -96,6 +98,8 @@ enum lttcomm_sessiond_command { LTTNG_SET_SESSION_SHM_PATH = 40, LTTNG_REGENERATE_METADATA = 41, LTTNG_REGENERATE_STATEDUMP = 42, + LTTNG_REGISTER_TRIGGER = 43, + LTTNG_UNREGISTER_TRIGGER = 44, }; enum lttcomm_relayd_command { @@ -146,6 +150,7 @@ enum lttcomm_return_code { LTTCOMM_CONSUMERD_RELAYD_FAIL, /* Error on remote relayd */ LTTCOMM_CONSUMERD_CHANNEL_FAIL, /* Channel creation failed. */ LTTCOMM_CONSUMERD_CHAN_NOT_FOUND, /* Channel not found. */ + LTTCOMM_CONSUMERD_ALREADY_SET, /* Resource already set. */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ @@ -269,6 +274,8 @@ struct lttcomm_session_msg { /* Create channel */ struct { struct lttng_channel chan LTTNG_PACKED; + /* struct lttng_channel_extended is already packed. */ + struct lttng_channel_extended extended; } LTTNG_PACKED channel; /* Context */ struct { @@ -311,6 +318,9 @@ struct lttcomm_session_msg { struct { uint32_t pid; } LTTNG_PACKED pid_tracker; + struct { + uint32_t length; + } LTTNG_PACKED trigger; } u; } LTTNG_PACKED; @@ -373,14 +383,6 @@ struct lttcomm_event_extended_header { uint32_t nb_exclusions; } LTTNG_PACKED; -/* - * Channel extended info. - */ -struct lttcomm_channel_extended { - uint64_t discarded_events; - uint64_t lost_packets; -} LTTNG_PACKED; - /* * Data structure for the response from sessiond to the lttng client. */ @@ -423,6 +425,8 @@ struct lttcomm_consumer_msg { uint32_t monitor; /* timer to check the streams usage in live mode (usec). */ unsigned int live_timer_interval; + /* timer to sample a channel's positions (usec). */ + unsigned int monitor_timer_interval; } LTTNG_PACKED channel; /* Only used by Kernel. */ struct { uint64_t stream_key; @@ -453,7 +457,8 @@ struct lttcomm_consumer_msg { int32_t overwrite; /* 1: overwrite, 0: discard */ uint32_t switch_timer_interval; /* usec */ uint32_t read_timer_interval; /* usec */ - unsigned int live_timer_interval; /* usec */ + unsigned int live_timer_interval; /* usec */ + uint32_t monitor_timer_interval; /* usec */ int32_t output; /* splice, mmap */ int32_t type; /* metadata or per_cpu */ uint64_t session_id; /* Tracing session id */ @@ -531,6 +536,19 @@ struct lttcomm_consumer_msg { } u; } LTTNG_PACKED; +/* + * Channel monitoring message returned to the session daemon on every + * monitor timer expiration. + */ +struct lttcomm_consumer_channel_monitor_msg { + /* Key of the sampled channel. */ + uint64_t key; + /* + * Lowest and highest usage (bytes) at the moment the sample was taken. + */ + uint64_t lowest, highest; +} LTTNG_PACKED; + /* * Status message returned to the sessiond after a received command. */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 7006e1cb4..147fe8aaf 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1502,8 +1502,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_timer_switch_start(channel, attr.switch_timer_interval); attr.switch_timer_interval = 0; } else { + int monitor_start_ret; + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + channel, + msg.u.ask_channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_channel_error; + } } health_code_update(); @@ -1526,6 +1535,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } goto end_channel_error; } @@ -1984,7 +1996,7 @@ void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) } /* - * Take a snapshot for a specific fd + * Take a snapshot for a specific stream. * * Returns 0 on success, < 0 on error */ @@ -1996,6 +2008,20 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream) return ustctl_snapshot(stream->ustream); } +/* + * Sample consumed and produced positions for a specific stream. + * + * Returns 0 on success, < 0 on error. + */ +int lttng_ustconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream) +{ + assert(stream); + assert(stream->ustream); + + return ustctl_snapshot_sample_positions(stream->ustream); +} + /* * Get the produced position * diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index 67b5bb511..b5ff16186 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -26,6 +26,8 @@ #ifdef HAVE_LIBLTTNG_UST_CTL int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream); +int lttng_ustconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream); int lttng_ustconsumer_get_produced_snapshot( struct lttng_consumer_stream *stream, unsigned long *pos); @@ -96,6 +98,13 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream) return -ENOSYS; } +static inline +int lttng_ustconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream) +{ + return -ENOSYS; +} + static inline int lttng_ustconsumer_get_produced_snapshot( struct lttng_consumer_stream *stream, unsigned long *pos) @@ -103,6 +112,13 @@ int lttng_ustconsumer_get_produced_snapshot( return -ENOSYS; } +static inline +int lttng_ustconsumer_get_consumed_snapshot( + struct lttng_consumer_stream *stream, unsigned long *pos) +{ + return -ENOSYS; +} + static inline int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll)