X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=f0fb2dc00f92967efdf7d22df4c7c61e98abd73b;hb=6d5d85c79765744fcad6ba189a256784b825e7bf;hp=92abcf21d07257ce4cfa83ad44dbf79bd833508d;hpb=5d2e1e66a968d9e555f9b8b00d0589ebfaf3de32;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 92abcf21d..f0fb2dc00 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -30,6 +30,8 @@ #include #include "consumer.h" +#include "health.h" +#include "ust-app.h" /* * Receive a reply command status message from the consumer. Consumer socket @@ -230,6 +232,41 @@ error: return ret; } +/* + * Return the consumer socket from the given consumer output with the right + * bitness. On error, returns NULL. + * + * The caller MUST acquire a rcu read side lock and keep it until the socket + * object reference is not needed anymore. + */ +struct consumer_socket *consumer_find_socket_by_bitness(int bits, + struct consumer_output *consumer) +{ + int consumer_fd; + struct consumer_socket *socket = NULL; + + switch (bits) { + case 64: + consumer_fd = uatomic_read(&ust_consumerd64_fd); + break; + case 32: + consumer_fd = uatomic_read(&ust_consumerd32_fd); + break; + default: + assert(0); + goto end; + } + + socket = consumer_find_socket(consumer_fd, consumer); + if (!socket) { + ERR("Consumer socket fd %d not found in consumer obj %p", + consumer_fd, consumer); + } + +end: + return socket; +} + /* * Find a consumer_socket in a consumer_output hashtable. Read side lock must * be acquired before calling this function and across use of the @@ -625,7 +662,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, gid_t gid, uint64_t relayd_id, uint64_t key, - unsigned char *uuid) + unsigned char *uuid, + uint32_t chan_id) { assert(msg); @@ -645,6 +683,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.gid = gid; msg->u.ask_channel.relayd_id = relayd_id; msg->u.ask_channel.key = key; + msg->u.ask_channel.chan_id = chan_id; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -933,3 +972,147 @@ error_unlock: rcu_read_unlock(); return -1; } + +/* + * Send a flush command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_flush_channel(struct consumer_socket *socket, uint64_t key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + assert(socket->fd >= 0); + + DBG2("Consumer flush channel key %" PRIu64, key); + + msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL; + msg.u.flush_channel.key = key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + +/* + * Send a close metdata command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_close_metadata(struct consumer_socket *socket, + uint64_t metadata_key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + assert(socket->fd >= 0); + + DBG2("Consumer close metadata channel key %" PRIu64, metadata_key); + + msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA; + msg.u.close_metadata.key = metadata_key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + +/* + * Send a setup metdata command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_setup_metadata(struct consumer_socket *socket, + uint64_t metadata_key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + assert(socket->fd >= 0); + + DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key); + + msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA; + msg.u.setup_metadata.key = metadata_key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + +/* + * Send metadata string to consumer. Socket lock MUST be acquired. + * + * Return 0 on success else a negative value. + */ +int consumer_push_metadata(struct consumer_socket *socket, + uint64_t metadata_key, char *metadata_str, size_t len, + size_t target_offset) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + assert(socket->fd >= 0); + + DBG2("Consumer push metadata to consumer socket %d", socket->fd); + + msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA; + msg.u.push_metadata.key = metadata_key; + msg.u.push_metadata.target_offset = target_offset; + msg.u.push_metadata.len = len; + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0 || len == 0) { + goto end; + } + + DBG3("Consumer pushing metadata on sock %d of len %lu", socket->fd, len); + + ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len); + if (ret < 0) { + goto end; + } + + health_code_update(); + ret = consumer_recv_status_reply(socket); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + return ret; +}