#include "consumer.h"
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+ int ret;
+ struct lttcomm_consumer_status_msg reply;
+
+ assert(sock);
+
+ ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Don't return 0 which means success. */
+ ret = -1;
+ }
+ /* The above call will print a PERROR on error. */
+ DBG("Fail to receive status reply on sock %d", sock->fd);
+ goto end;
+ }
+
+ if (reply.ret_code == LTTNG_OK) {
+ /* All good. */
+ ret = 0;
+ } else {
+ ret = -reply.ret_code;
+ DBG("Consumer ret code %d", reply.ret_code);
+ }
+
+end:
+ return ret;
+}
+
/*
* Send destroy relayd command to consumer.
*
pthread_mutex_lock(sock->lock);
ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
- pthread_mutex_unlock(sock->lock);
if (ret < 0) {
- PERROR("send consumer destroy relayd command");
- goto error;
+ /* Indicate that the consumer is probably closing at this point. */
+ DBG("send consumer destroy relayd command");
+ goto error_send;
}
+ /* Don't check the return value. The caller will do it. */
+ ret = consumer_recv_status_reply(sock);
+
DBG2("Consumer send destroy relayd command done");
+error_send:
+ pthread_mutex_unlock(sock->lock);
error:
return ret;
}
*/
void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
{
- int ret;
struct lttng_ht_iter iter;
struct consumer_socket *socket;
rcu_read_lock();
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
node.node) {
+ int ret;
+
/* Send destroy relayd command */
ret = consumer_send_destroy_relayd(socket, consumer);
if (ret < 0) {
- ERR("Unable to send destroy relayd command to consumer");
+ DBG("Unable to send destroy relayd command to consumer");
/* Continue since we MUST delete everything at this point. */
}
}
/*
* Send file descriptor to consumer via sock.
*/
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
{
int ret;
assert(fds);
+ assert(sock);
assert(nb_fd > 0);
- ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+ ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
if (ret < 0) {
- PERROR("send consumer fds");
+ /* The above call will print a PERROR on error. */
+ DBG("Error when sending consumer fds on sock %d", sock->fd);
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+
error:
return ret;
}
/*
* Consumer send channel communication message structure to consumer.
*/
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+ struct lttcomm_consumer_msg *msg)
{
int ret;
assert(msg);
- assert(sock >= 0);
+ assert(sock);
+ assert(sock->fd >= 0);
- ret = lttcomm_send_unix_sock(sock, msg,
+ ret = lttcomm_send_unix_sock(sock->fd, msg,
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
- PERROR("send consumer channel");
+ /* The above call will print a PERROR on error. */
+ DBG("Error when sending consumer channel on sock %d", sock->fd);
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+
error:
return ret;
}
/*
* Send stream communication structure to the consumer.
*/
-int consumer_send_stream(int sock, struct consumer_output *dst,
- struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+ struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+ int *fds, size_t nb_fd)
{
int ret;
assert(msg);
assert(dst);
+ assert(sock);
switch (dst->type) {
case CONSUMER_DST_NET:
}
/* Send on socket */
- ret = lttcomm_send_unix_sock(sock, msg,
+ ret = lttcomm_send_unix_sock(sock->fd, msg,
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
- PERROR("send consumer stream");
+ /* The above call will print a PERROR on error. */
+ DBG("Error when sending consumer stream on sock %d", sock->fd);
+ goto error;
+ }
+
+ ret = consumer_recv_status_reply(sock);
+ if (ret < 0) {
goto error;
}
*
* On success return positive value. On error, negative value.
*/
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type)
+ enum lttng_stream_type type, unsigned int session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
/* Code flow error. Safety net. */
assert(sock);
assert(consumer);
+ assert(consumer_sock);
/* Bail out if consumer is disabled */
if (!consumer->enabled) {
*/
msg.u.relayd_sock.net_index = consumer->net_seq_index;
msg.u.relayd_sock.type = type;
+ msg.u.relayd_sock.session_id = session_id;
memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
- DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
- ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+ DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+ ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
+ if (ret < 0) {
+ /* The above call will print a PERROR on error. */
+ DBG("Error when sending relayd sockets on sock %d", sock->fd);
+ goto error;
+ }
+
+ ret = consumer_recv_status_reply(consumer_sock);
if (ret < 0) {
- PERROR("send consumer relayd socket info");
goto error;
}
ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
if (ret < 0) {
- PERROR("send consumer data pending command");
+ /* The above call will print a PERROR on error. */
+ DBG("Error on consumer is data pending on sock %d", socket->fd);
pthread_mutex_unlock(socket->lock);
goto error;
}
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+
ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
- if (ret < 0) {
- PERROR("recv consumer data pending status");
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Don't return 0 which means success. */
+ ret = -1;
+ }
+ /* The above call will print a PERROR on error. */
+ DBG("Error on recv consumer is data pending on sock %d", socket->fd);
pthread_mutex_unlock(socket->lock);
goto error;
}
}
}
- DBG("Consumer data pending ret %d", ret_code);
+ DBG("Consumer data is %s pending for session id %u",
+ ret_code == 1 ? "" : "NOT", id);
return ret_code;
error: