X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fust-consumer.c;h=33cb6ff376c16c92db8bf605963054b364b59555;hp=60ec5c0b2d623013d7bc2858c2927e207e1b9e4e;hb=d88aee689d5bd0067f362a323cb69c37717df59f;hpb=d0b96690836f4b876096f3dc14801f8e25281a77 diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 60ec5c0b2..33cb6ff37 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -99,7 +100,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, struct consumer_socket *socket) { int ret; - unsigned long key; + uint64_t key; char *pathname = NULL; struct lttcomm_consumer_msg msg; @@ -151,7 +152,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, /* We need at least one where 1 stream for 1 cpu. */ assert(ua_chan->expected_stream_count > 0); - DBG2("UST ask channel %lu successfully done with %u stream(s)", key, + DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key, ua_chan->expected_stream_count); error: @@ -383,3 +384,131 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app, error: return ret; } + +/* + * Send metadata string to consumer. + * + * Return 0 on success else a negative value. + */ +int ust_consumer_push_metadata(struct consumer_socket *socket, + struct ust_app_session *ua_sess, char *metadata_str, + size_t len, size_t target_offset) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + assert(socket->fd >= 0); + assert(ua_sess); + assert(ua_sess->metadata); + + DBG2("UST consumer push metadata to consumer socket %d", socket->fd); + + msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA; + msg.u.push_metadata.key = ua_sess->metadata->key; + msg.u.push_metadata.target_offset = target_offset; + msg.u.push_metadata.len = len; + + /* + * TODO: reenable these locks when the consumerd gets the ability to + * reorder the metadata it receives. This fits with locking in + * src/bin/lttng-sessiond/ust-app.c:push_metadata() + * + * pthread_mutex_lock(socket->lock); + */ + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + + DBG3("UST consumer push metadata on sock %d of len %lu", socket->fd, len); + + ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len); + if (ret < 0) { + fprintf(stderr, "send error: %d\n", ret); + goto error; + } + + health_code_update(); + ret = consumer_recv_status_reply(socket); + if (ret < 0) { + goto error; + } + +error: + health_code_update(); + /* + * pthread_mutex_unlock(socket->lock); + */ + return ret; +} + +/* + * Send a close metdata command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int ust_consumer_close_metadata(struct consumer_socket *socket, + struct ust_app_channel *ua_chan) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(ua_chan); + assert(socket); + assert(socket->fd >= 0); + + DBG2("UST consumer close metadata channel key %lu", ua_chan->key); + + msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA; + msg.u.close_metadata.key = ua_chan->key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + +error: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + +/* + * Send a setup metdata command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int ust_consumer_setup_metadata(struct consumer_socket *socket, + struct ust_app_channel *ua_chan) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(ua_chan); + assert(socket); + assert(socket->fd >= 0); + + DBG2("UST consumer setup metadata channel key %lu", ua_chan->key); + + msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA; + msg.u.setup_metadata.key = ua_chan->key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + +error: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +}