Fix: read subbuffer mmap/splice signedness issue
[lttng-tools.git] / src / common / consumer.c
index 152064078d34b354263a79992e93d965a5b738a4..b942778b32e9a768f7da0029507b52af10478804 100644 (file)
@@ -45,6 +45,7 @@
 
 #include "consumer.h"
 #include "consumer-stream.h"
+#include "consumer-testpoint.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -768,6 +769,44 @@ end:
        return ret;
 }
 
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(net_seq_idx != -1ULL);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_streams_sent(&relayd->control_sock);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+       } else {
+               ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+                               net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+       DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Find a relayd and close the stream
  */
@@ -1273,6 +1312,57 @@ error:
        return NULL;
 }
 
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_stream(stream, ht);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
+/*
+ * Iterate over all streams of the metadata hashtable and free them
+ * properly.
+ */
+static void destroy_metadata_stream_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_metadata_stream(stream, ht);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
 /*
  * Close all fds associated with the instance and free the context.
  */
@@ -1282,6 +1372,9 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 
        DBG("Consumer destroying it. Closing everything.");
 
+       destroy_data_stream_ht(data_ht);
+       destroy_metadata_stream_ht(metadata_ht);
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1354,7 +1447,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1491,44 +1584,49 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
        }
 
-       while (len > 0) {
-               ret = lttng_write(outfd, mmap_base + mmap_offset, len);
-               DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-               if (ret < len) {
-                       /*
-                        * This is possible if the fd is closed on the other side (outfd)
-                        * or any write problem. It can be verbose a bit for a normal
-                        * execution if for instance the relayd is stopped abruptly. This
-                        * can happen so set this to a DBG statement.
-                        */
-                       DBG("Error in file write mmap");
-                       if (written == 0) {
-                               written = -errno;
-                       }
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EPIPE || errno == EINVAL) {
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
-                       goto end;
-               } else if (ret > len) {
-                       PERROR("Error in file write (ret %zd > len %lu)", ret, len);
-                       written += ret;
-                       goto end;
+       /*
+        * This call guarantee that len or less is returned. It's impossible to
+        * receive a ret value that is bigger than len.
+        */
+       ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+       DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+       if (ret < 0 || ((size_t) ret != len)) {
+               /*
+                * Report error to caller if nothing was written else at least send the
+                * amount written.
+                */
+               if (ret < 0) {
+                       written = -errno;
                } else {
-                       len -= ret;
-                       mmap_offset += ret;
+                       written = ret;
                }
 
-               /* This call is useless on a socket so better save a syscall. */
-               if (!relayd) {
-                       /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
-                                       SYNC_FILE_RANGE_WRITE);
-                       stream->out_fd_offset += ret;
+               /* Socket operation failed. We consider the relayd dead */
+               if (errno == EPIPE || errno == EINVAL) {
+                       /*
+                        * This is possible if the fd is closed on the other side
+                        * (outfd) or any write problem. It can be verbose a bit for a
+                        * normal execution if for instance the relayd is stopped
+                        * abruptly. This can happen so set this to a DBG statement.
+                        */
+                       DBG("Consumer mmap write detected relayd hang up");
+                       relayd_hang_up = 1;
+                       goto write_error;
                }
-               stream->output_written += ret;
-               written += ret;
+
+               /* Unhandled error, print it and stop function right now. */
+               PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+               goto end;
+       }
+       stream->output_written += ret;
+       written = ret;
+
+       /* This call is useless on a socket so better save a syscall. */
+       if (!relayd) {
+               /* This won't block, but will start writeout asynchronously */
+               lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+                               SYNC_FILE_RANGE_WRITE);
+               stream->out_fd_offset += len;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
@@ -1562,7 +1660,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1697,11 +1795,11 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe, ret %zd", ret_splice);
                if (ret_splice < 0) {
-                       PERROR("Error in relay splice");
+                       ret = errno;
                        if (written == 0) {
                                written = ret_splice;
                        }
-                       ret = errno;
+                       PERROR("Error in relay splice");
                        goto splice_error;
                }
 
@@ -1727,27 +1825,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("Consumer splice pipe to file, ret %zd", ret_splice);
                if (ret_splice < 0) {
-                       PERROR("Error in file splice");
+                       ret = errno;
                        if (written == 0) {
                                written = ret_splice;
                        }
                        /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF || errno == EPIPE) {
+                       if (errno == EBADF || errno == EPIPE || errno == ESPIPE) {
                                WARN("Remote relayd disconnected. Stopping");
                                relayd_hang_up = 1;
                                goto write_error;
                        }
-                       ret = errno;
+                       PERROR("Error in file splice");
                        goto splice_error;
                } else if (ret_splice > len) {
-                       errno = EINVAL;
-                       PERROR("Wrote more data than requested %zd (len: %lu)",
-                                       ret_splice, len);
+                       /*
+                        * We don't expect this code path to be executed but you never know
+                        * so this is an extra protection agains a buggy splice().
+                        */
                        written += ret_splice;
                        ret = errno;
+                       PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
+                                       len);
                        goto splice_error;
+               } else {
+                       /* All good, update current len and continue. */
+                       len -= ret_splice;
                }
-               len -= ret_splice;
 
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
@@ -1760,9 +1863,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
-
-       ret = ret_splice;
-
        goto end;
 
 write_error:
@@ -1856,60 +1956,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
 }
 
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * WARNING: *MUST* be used with data stream only.
- */
-static void destroy_data_stream_ht(struct lttng_ht *ht)
-{
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
-       if (ht == NULL) {
-               return;
-       }
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_stream(stream, ht);
-       }
-       rcu_read_unlock();
-
-       lttng_ht_destroy(ht);
-}
-
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * XXX: Should not be only for metadata stream or else use an other name.
- */
-static void destroy_stream_ht(struct lttng_ht *ht)
-{
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
-       if (ht == NULL) {
-               return;
-       }
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_metadata_stream(stream, ht);
-       }
-       rcu_read_unlock();
-
-       lttng_ht_destroy(ht);
-}
-
 void lttng_consumer_close_metadata(void)
 {
        switch (consumer_data.type) {
@@ -2216,14 +2262,12 @@ void *consumer_thread_metadata_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
 
-       health_code_update();
-
-       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (!metadata_ht) {
-               /* ENOMEM at this point. Better to bail out. */
-               goto end_ht;
+       if (testpoint(consumerd_thread_metadata)) {
+               goto error_testpoint;
        }
 
+       health_code_update();
+
        DBG("Thread metadata poll started");
 
        /* Size is set to 1 for the consumer_metadata pipe */
@@ -2396,8 +2440,7 @@ end:
 
        lttng_poll_clean(&events);
 end_poll:
-       destroy_stream_ht(metadata_ht);
-end_ht:
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2426,14 +2469,12 @@ void *consumer_thread_data_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
 
-       health_code_update();
-
-       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (data_ht == NULL) {
-               /* ENOMEM at this point. Better to bail out. */
-               goto end;
+       if (testpoint(consumerd_thread_data)) {
+               goto error_testpoint;
        }
 
+       health_code_update();
+
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
        if (local_stream == NULL) {
                PERROR("local_stream malloc");
@@ -2663,8 +2704,7 @@ end:
         */
        (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
-       destroy_data_stream_ht(data_ht);
-
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2765,6 +2805,10 @@ void *consumer_thread_channel_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
 
+       if (testpoint(consumerd_thread_channel)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
@@ -2966,6 +3010,7 @@ end:
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
+error_testpoint:
        DBG("Channel poll thread exiting");
        if (err) {
                health_error();
@@ -3021,6 +3066,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
 
+       if (testpoint(consumerd_thread_sessiond)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
@@ -3157,6 +3206,7 @@ end:
                }
        }
 
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -3218,12 +3268,42 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
 /*
  * Allocate and set consumer data hash tables.
  */
-void lttng_consumer_init(void)
+int lttng_consumer_init(void)
 {
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.channel_ht) {
+               goto error;
+       }
+
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.relayd_ht) {
+               goto error;
+       }
+
        consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.stream_list_ht) {
+               goto error;
+       }
+
        consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.stream_per_chan_id_ht) {
+               goto error;
+       }
+
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!data_ht) {
+               goto error;
+       }
+
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!metadata_ht) {
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
 }
 
 /*
This page took 0.030219 seconds and 5 git commands to generate.