X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=395657066847944742c6a7a86c72254c0e048022;hb=b31610f2294a6a827fa2d0d19d71199567db8dc5;hp=6eb84d6aee403318df005dad3393c7c24f9e19b7;hpb=5e280d7780be0616d07b9523fc14ec48e2d3caad;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 6eb84d6ae..395657066 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -15,7 +15,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -564,6 +563,8 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) output->net_seq_index = obj->net_seq_index; memcpy(output->subdir, obj->subdir, PATH_MAX); output->snapshot = obj->snapshot; + output->relay_major_version = obj->relay_major_version; + output->relay_minor_version = obj->relay_minor_version; memcpy(&output->dst, &obj->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, obj); if (ret < 0) { @@ -1353,7 +1354,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { - if (ret != -EEXIST) { + if (errno != EEXIST) { ERR("Trace directory creation error"); goto error; } @@ -1370,3 +1371,117 @@ error: health_code_update(); return ret; } + +/* + * Ask the consumer the number of discarded events for a channel. + */ +int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *discarded) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer discarded events id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS; + msg.u.discarded_events.session_id = session_id; + msg.u.discarded_events.channel_key = channel_key; + + *discarded = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_discarded = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_discarded, + sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *discarded += consumer_discarded; + } + ret = 0; + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, + *discarded, session_id); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Ask the consumer the number of lost packets for a channel. + */ +int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *lost) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer lost packets id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS; + msg.u.lost_packets.session_id = session_id; + msg.u.lost_packets.channel_key = channel_key; + + *lost = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, + sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *lost += consumer_lost; + } + ret = 0; + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, + *lost, session_id); + +end: + rcu_read_unlock(); + return ret; +}