Add consumer-stream.c/.h in libconsumer
[lttng-tools.git] / src / common / consumer.c
index 116441171e5aa73d198c2bbd90175da0d9fb03fa..d46f19342a74aadd4a99d8f88221a3913fcd58ce 100644 (file)
@@ -254,10 +254,8 @@ static void free_relayd_rcu(struct rcu_head *head)
 
 /*
  * Destroy and free relayd socket pair object.
- *
- * This function MUST be called with the consumer_data lock acquired.
  */
-static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -337,7 +335,7 @@ static void cleanup_relayd_ht(void)
 
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 
        rcu_read_unlock();
@@ -404,7 +402,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * Delete the relayd from the relayd hash table, close the sockets and free
         * the object in a RCU call.
         */
-       destroy_relayd(relayd);
+       consumer_destroy_relayd(relayd);
 
        /* Set inactive endpoint to all streams */
        update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
@@ -436,7 +434,7 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
 
        /* Destroy the relayd if refcount is 0 */
        if (uatomic_read(&relayd->refcount) == 0) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 }
 
@@ -539,7 +537,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
+                       consumer_destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
@@ -568,7 +566,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                const char *channel_name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
@@ -855,7 +853,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                enum lttng_event_output output,
                uint64_t tracefile_size,
                uint64_t tracefile_count)
@@ -1957,7 +1955,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
+                       consumer_destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
@@ -3083,29 +3081,36 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
-       /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0) {
-               /* Somehow, the session daemon is not responding anymore. */
-               goto error;
-       }
-
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd == NULL) {
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
-                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
-                       ret = -1;
-                       goto error;
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+                       ret = -ENOMEM;
+               } else {
+                       relayd->sessiond_session_id = (uint64_t) sessiond_id;
+                       relayd_created = 1;
                }
-               relayd->sessiond_session_id = (uint64_t) sessiond_id;
-               relayd_created = 1;
+
+               /*
+                * This code path MUST continue to the consumer send status message to
+                * we can notify the session daemon and continue our work without
+                * killing everything.
+                */
+       }
+
+       /* First send a status message before receiving the fds. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0 || ret_code != LTTNG_OK) {
+               /* Somehow, the session daemon is not responding anymore. */
+               goto error;
        }
 
        /* Poll on consumer socket. */
        if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
                goto error;
        }
@@ -3113,15 +3118,31 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        /* Get relayd socket from session daemon */
        ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
        if (ret != sizeof(fd)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+               ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
                ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
-               goto error_close;
+
+               /*
+                * Failing to receive FDs might indicate a major problem such as
+                * reaching a fd limit during the receive where the kernel returns a
+                * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+                * don't take any chances and stop everything.
+                *
+                * XXX: Feature request #558 will fix that and avoid this possible
+                * issue when reaching the fd limit.
+                */
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+
+               /*
+                * This code path MUST continue to the consumer send status message so
+                * we can send the error to the thread expecting a reply. The above
+                * call will make everything stop.
+                */
        }
 
        /* We have the fds without error. Send status back. */
        ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0) {
+       if (ret < 0 || ret_code != LTTNG_OK) {
                /* Somehow, the session daemon is not responding anymore. */
                goto error;
        }
@@ -3221,7 +3242,6 @@ error:
                }
        }
 
-error_close:
        if (relayd_created) {
                free(relayd);
        }
This page took 0.032607 seconds and 5 git commands to generate.