#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/defaults.h>
#include <common/uri.h>
#include "consumer.h"
+#include "health.h"
+#include "ust-app.h"
/*
* Receive a reply command status message from the consumer. Consumer socket
* negative value is sent back and both parameters are untouched.
*/
int consumer_recv_status_channel(struct consumer_socket *sock,
- unsigned long *key, unsigned int *stream_count)
+ uint64_t *key, unsigned int *stream_count)
{
int ret;
struct lttcomm_consumer_status_channel reply;
return ret;
}
+/*
+ * Return the consumer socket from the given consumer output with the right
+ * bitness. On error, returns NULL.
+ *
+ * The caller MUST acquire a rcu read side lock and keep it until the socket
+ * object reference is not needed anymore.
+ */
+struct consumer_socket *consumer_find_socket_by_bitness(int bits,
+ struct consumer_output *consumer)
+{
+ int consumer_fd;
+ struct consumer_socket *socket = NULL;
+
+ switch (bits) {
+ case 64:
+ consumer_fd = uatomic_read(&ust_consumerd64_fd);
+ break;
+ case 32:
+ consumer_fd = uatomic_read(&ust_consumerd32_fd);
+ break;
+ default:
+ assert(0);
+ goto end;
+ }
+
+ socket = consumer_find_socket(consumer_fd, consumer);
+ if (!socket) {
+ ERR("Consumer socket fd %d not found in consumer obj %p",
+ consumer_fd, consumer);
+ }
+
+end:
+ return socket;
+}
+
/*
* Find a consumer_socket in a consumer_output hashtable. Read side lock must
* be acquired before calling this function and across use of the
/* By default, consumer output is enabled */
output->enabled = 1;
output->type = type;
- output->net_seq_index = -1;
+ output->net_seq_index = (uint64_t) -1ULL;
output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
- unsigned long key,
- unsigned char *uuid)
+ uint64_t relayd_id,
+ uint64_t key,
+ unsigned char *uuid,
+ uint32_t chan_id)
{
assert(msg);
msg->u.ask_channel.gid = gid;
msg->u.ask_channel.relayd_id = relayd_id;
msg->u.ask_channel.key = key;
+ msg->u.ask_channel.chan_id = chan_id;
memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
*/
void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
- int channel_key,
+ uint64_t channel_key,
uint64_t session_id,
const char *pathname,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
const char *name,
unsigned int nb_init_streams,
enum lttng_event_output output,
*/
void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
- int channel_key,
- int stream_key,
+ uint64_t channel_key,
+ uint64_t stream_key,
int cpu)
{
assert(msg);
*/
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type, unsigned int session_id)
+ enum lttng_stream_type type, uint64_t session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
* This function has a different behavior with the consumer i.e. that it waits
* for a reply from the consumer if yes or no the data is pending.
*/
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer)
{
int ret;
msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
- msg.u.data_pending.session_id = (uint64_t) id;
+ msg.u.data_pending.session_id = session_id;
- DBG3("Consumer data pending for id %u", id);
+ DBG3("Consumer data pending for id %" PRIu64, session_id);
/* Send command for each consumer */
rcu_read_lock();
}
rcu_read_unlock();
- DBG("Consumer data is %s pending for session id %u",
- ret_code == 1 ? "" : "NOT", id);
+ DBG("Consumer data is %s pending for session id %" PRIu64,
+ ret_code == 1 ? "" : "NOT", session_id);
return ret_code;
error_unlock:
rcu_read_unlock();
return -1;
}
+
+/*
+ * Send a flush command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer flush channel key %" PRIu64, key);
+
+ msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
+ msg.u.flush_channel.key = key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send a close metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_close_metadata(struct consumer_socket *socket,
+ uint64_t metadata_key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
+
+ msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
+ msg.u.close_metadata.key = metadata_key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send a setup metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_setup_metadata(struct consumer_socket *socket,
+ uint64_t metadata_key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
+
+ msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
+ msg.u.setup_metadata.key = metadata_key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send metadata string to consumer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_push_metadata(struct consumer_socket *socket,
+ uint64_t metadata_key, char *metadata_str, size_t len,
+ size_t target_offset)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer push metadata to consumer socket %d", socket->fd);
+
+ msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
+ msg.u.push_metadata.key = metadata_key;
+ msg.u.push_metadata.target_offset = target_offset;
+ msg.u.push_metadata.len = len;
+
+ /*
+ * TODO: reenable these locks when the consumerd gets the ability to
+ * reorder the metadata it receives. This fits with locking in
+ * src/bin/lttng-sessiond/ust-app.c:push_metadata()
+ *
+ * pthread_mutex_lock(socket->lock);
+ */
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+ DBG3("Consumer pushing metadata on sock %d of len %lu", socket->fd, len);
+
+ ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len);
+ if (ret < 0) {
+ goto end;
+ }
+
+ health_code_update();
+ ret = consumer_recv_status_reply(socket);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ /*
+ * pthread_mutex_unlock(socket->lock);
+ */
+ return ret;
+}