consumerd: cleanup: use buffer view interface for mmap read subbuf
[lttng-tools.git] / src / common / consumer / consumer.c
index fbbe8ea8fe8907b71db0a59cc5737faad192b04a..d70a5d1d406b758e1b02cc61f22b440ee006a7b1 100644 (file)
@@ -1,20 +1,10 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *               2012 - David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License, version 2 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #define _LGPL_SOURCE
@@ -91,6 +81,20 @@ int consumer_quit;
 static struct lttng_ht *metadata_ht;
 static struct lttng_ht *data_ht;
 
+static const char *get_consumer_domain(void)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return DEFAULT_KERNEL_TRACE_DIR;
+       case LTTNG_CONSUMER64_UST:
+               /* Fall-through. */
+       case LTTNG_CONSUMER32_UST:
+               return DEFAULT_UST_TRACE_DIR;
+       default:
+               abort();
+       }
+}
+
 /*
  * Notify a thread lttng pipe to poll back again. This usually means that some
  * global state has changed so we just send back the thread in a poll wait
@@ -816,7 +820,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_add_stream(&relayd->control_sock, stream->name,
-                               path, &stream->relayd_stream_id,
+                               get_consumer_domain(), path, &stream->relayd_stream_id,
                                stream->chan->tracefile_size,
                                stream->chan->tracefile_count,
                                stream->trace_chunk);
@@ -1666,18 +1670,19 @@ end:
  */
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len,
+               struct lttng_consumer_stream *stream,
+               const struct lttng_buffer_view *buffer,
                unsigned long padding,
                struct ctf_packet_index *index)
 {
-       unsigned long mmap_offset;
-       void *mmap_base;
        ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = NULL;
        unsigned int relayd_hang_up = 0;
+       const size_t subbuf_content_size = buffer->size - padding;
+       size_t write_len;
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
@@ -1693,39 +1698,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
        }
 
-       /* get the offset inside the fd to mmap */
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               mmap_base = stream->mmap_base;
-               ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
-               if (ret < 0) {
-                       PERROR("tracer ctl get_mmap_read_offset");
-                       goto end;
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               mmap_base = lttng_ustctl_get_mmap_base(stream);
-               if (!mmap_base) {
-                       ERR("read mmap get mmap base for stream %s", stream->name);
-                       ret = -EPERM;
-                       goto end;
-               }
-               ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-               if (ret != 0) {
-                       PERROR("tracer ctl get_mmap_read_offset");
-                       ret = -EINVAL;
-                       goto end;
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-       }
-
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
-               unsigned long netlen = len;
+               unsigned long netlen = subbuf_content_size;
 
                /*
                 * Lock the control socket for the complete duration of the function
@@ -1763,10 +1738,10 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                goto write_error;
                        }
                }
-       } else {
-               /* No streaming, we have to set the len with the full padding */
-               len += padding;
 
+               write_len = subbuf_content_size;
+       } else {
+               /* No streaming; we have to write the full padding. */
                if (stream->metadata_flag && stream->reset_metadata_flag) {
                        ret = utils_truncate_stream_file(stream->out_fd, 0);
                        if (ret < 0) {
@@ -1780,7 +1755,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                 * Check if we need to change the tracefile before writing the packet.
                 */
                if (stream->chan->tracefile_size > 0 &&
-                               (stream->tracefile_size_current + len) >
+                               (stream->tracefile_size_current + buffer->size) >
                                stream->chan->tracefile_size) {
                        ret = consumer_stream_rotate_output_files(stream);
                        if (ret) {
@@ -1789,19 +1764,21 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        outfd = stream->out_fd;
                        orig_offset = 0;
                }
-               stream->tracefile_size_current += len;
+               stream->tracefile_size_current += buffer->size;
                if (index) {
                        index->offset = htobe64(stream->out_fd_offset);
                }
+
+               write_len = buffer->size;
        }
 
        /*
         * This call guarantee that len or less is returned. It's impossible to
         * receive a ret value that is bigger than len.
         */
-       ret = lttng_write(outfd, mmap_base + mmap_offset, len);
-       DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-       if (ret < 0 || ((size_t) ret != len)) {
+       ret = lttng_write(outfd, buffer->data, write_len);
+       DBG("Consumer mmap write() ret %zd (len %lu)", ret, write_len);
+       if (ret < 0 || ((size_t) ret != write_len)) {
                /*
                 * Report error to caller if nothing was written else at least send the
                 * amount written.
@@ -1822,7 +1799,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        DBG("Consumer mmap write detected relayd hang up");
                } else {
                        /* Unhandled error, print it and stop function right now. */
-                       PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+                       PERROR("Error in write mmap (ret %zd != write_len %zu)", ret,
+                                       write_len);
                }
                goto write_error;
        }
@@ -1831,9 +1809,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+               lttng_sync_file_range(outfd, stream->out_fd_offset, write_len,
                                SYNC_FILE_RANGE_WRITE);
-               stream->out_fd_offset += len;
+               stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
 
@@ -3949,8 +3927,18 @@ int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_act
                } else {
                        ret = kernctl_buffer_flush_empty(stream->wait_fd);
                        if (ret < 0) {
-                               ERR("Failed to flush kernel stream");
-                               goto end;
+                               /*
+                                * Doing a buffer flush which does not take into
+                                * account empty packets. This is not perfect,
+                                * but required as a fall-back when
+                                * "flush_empty" is not implemented by
+                                * lttng-modules.
+                                */
+                               ret = kernctl_buffer_flush(stream->wait_fd);
+                               if (ret < 0) {
+                                       ERR("Failed to flush kernel stream");
+                                       goto end;
+                               }
                        }
                }
                break;
@@ -4070,7 +4058,13 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                 */
                produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
                if (consumed_pos == produced_pos) {
+                       DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
+                                       stream->key, produced_pos, consumed_pos);
                        stream->rotate_ready = true;
+               } else {
+                       DBG("Different consumed and produced positions "
+                                       "for stream %" PRIu64 " produced = %lu consumed = %lu",
+                                       stream->key, produced_pos, consumed_pos);
                }
                /*
                 * The rotation position is based on the packet_seq_num of the
@@ -4095,6 +4089,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                }
                stream->rotate_position = stream->last_sequence_number + 1 +
                                ((produced_pos - consumed_pos) / stream->max_sb_size);
+               DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
+                               stream->key, stream->rotate_position);
 
                if (!is_local_trace) {
                        /*
@@ -4159,6 +4155,110 @@ end:
        return ret;
 }
 
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+       unsigned long consumed_pos_before, consumed_pos_after;
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Taking snapshot positions");
+               goto end;
+       }
+
+       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
+       if (ret < 0) {
+               ERR("Consumed snapshot position");
+               goto end;
+       }
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_clear(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to clear kernel stream (ret = %d)", ret);
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_clear_buffer(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Taking snapshot positions");
+               goto end;
+       }
+       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
+       if (ret < 0) {
+               ERR("Consumed snapshot position");
+               goto end;
+       }
+       DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
+end:
+       return ret;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       ret = consumer_flush_buffer(stream, 1);
+       if (ret < 0) {
+               ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_buffer(stream);
+       if (ret < 0) {
+               ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       rcu_read_lock();
+       pthread_mutex_lock(&channel->lock);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               pthread_mutex_lock(&stream->lock);
+               ret = consumer_clear_stream(stream);
+               if (ret) {
+                       goto error_unlock;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Check if a stream is ready to be rotated after extracting it.
  *
@@ -4167,6 +4267,11 @@ end:
  */
 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
+       DBG("Check is rotate ready for stream %" PRIu64
+                       " ready %u rotate_position %" PRIu64
+                       " last_sequence_number %" PRIu64,
+                       stream->key, stream->rotate_ready,
+                       stream->rotate_position, stream->last_sequence_number);
        if (stream->rotate_ready) {
                return 1;
        }
@@ -4195,6 +4300,12 @@ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
         * but consumerd considers rotation ready when reaching the last
         * packet of the current chunk, hence the "rotate_position - 1".
         */
+
+       DBG("Check is rotate ready for stream %" PRIu64
+                       " last_sequence_number %" PRIu64
+                       " rotate_position %" PRIu64,
+                       stream->key, stream->last_sequence_number,
+                       stream->rotate_position);
        if (stream->last_sequence_number >= stream->rotate_position - 1) {
                return 1;
        }
@@ -4207,6 +4318,8 @@ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
  */
 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
 {
+       DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64,
+                       stream->key);
        stream->rotate_position = -1ULL;
        stream->rotate_ready = false;
 }
@@ -4459,7 +4572,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
         * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
         */
        created_chunk = lttng_trace_chunk_create(chunk_id,
-                       chunk_creation_timestamp);
+                       chunk_creation_timestamp, NULL);
        if (!created_chunk) {
                ERR("Failed to create trace chunk");
                ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
@@ -4790,3 +4903,67 @@ end_rcu_unlock:
 end:
        return ret_code;
 }
+
+static
+int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
+{
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+       int ret;
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               /*
+                * Protect against teardown with mutex.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+               ret = consumer_clear_stream(stream);
+               if (ret) {
+                       goto error_unlock;
+               }
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+       rcu_read_unlock();
+       return LTTCOMM_CONSUMERD_SUCCESS;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               /*
+                * Nothing to do for the metadata channel/stream.
+                * Snapshot mechanism already take care of the metadata
+                * handling/generation, and monitored channels only need to
+                * have their data stream cleared..
+                */
+               ret = LTTCOMM_CONSUMERD_SUCCESS;
+               goto end;
+       }
+
+       if (!channel->monitor) {
+               ret = consumer_clear_unmonitored_channel(channel);
+       } else {
+               ret = consumer_clear_monitored_channel(channel);
+       }
+end:
+       return ret;
+}
This page took 0.032814 seconds and 5 git commands to generate.