consumerd: on_sleep not called on stream when no data is available
[lttng-tools.git] / src / common / consumer / consumer.c
index 814d93e76887c64d1ba8e54ad813d8f909f2850b..8a2a0f9c93c8cea6d8f9c1b77725142ed1673227 100644 (file)
@@ -17,6 +17,7 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
+#include "common/index/ctf-index.h"
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
@@ -320,6 +321,7 @@ static void free_relayd_rcu(struct rcu_head *head)
        (void) relayd_close(&relayd->control_sock);
        (void) relayd_close(&relayd->data_sock);
 
+       pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
        free(relayd);
 }
 
@@ -416,25 +418,24 @@ static void cleanup_relayd_ht(void)
 }
 
 /*
- * Update the end point status of all streams having the given network sequence
- * index (relayd index).
+ * Update the end point status of all streams having the given relayd id.
  *
  * It's atomically set without having the stream mutex locked which is fine
  * because we handle the write/read race with a pipe wakeup for each thread.
  */
-static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
+static void update_endpoint_status_by_netidx(uint64_t relayd_id,
                enum consumer_endpoint_status status)
 {
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
-       DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
+       DBG("Consumer set delete flag on stream by idx %" PRIu64, relayd_id);
 
        rcu_read_lock();
 
        /* Let's begin with metadata */
        cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
-               if (stream->net_seq_idx == net_seq_idx) {
+               if (stream->relayd_id == relayd_id) {
                        uatomic_set(&stream->endpoint_status, status);
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
@@ -442,7 +443,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
 
        /* Follow up by the data streams */
        cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
-               if (stream->net_seq_idx == net_seq_idx) {
+               if (stream->relayd_id == relayd_id) {
                        uatomic_set(&stream->endpoint_status, status);
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
                }
@@ -458,17 +459,16 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
  * If a local data context is available, notify the threads that the streams'
  * state have changed.
  */
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
-               struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        uint64_t netidx;
 
        assert(relayd);
 
-       DBG("Cleaning up relayd sockets");
+       DBG("Cleaning up relayd object ID %"PRIu64, relayd->id);
 
        /* Save the net sequence index before destroying the object */
-       netidx = relayd->net_seq_idx;
+       netidx = relayd->id;
 
        /*
         * Delete the relayd from the relayd hash table, close the sockets and free
@@ -485,10 +485,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * memory barrier ordering the updates of the end point status from the
         * read of this status which happens AFTER receiving this notify.
         */
-       if (ctx) {
-               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
-       }
+       notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+       notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
 }
 
 /*
@@ -535,92 +533,6 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
        consumer_stream_destroy(stream, metadata_ht);
 }
 
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
-               uint64_t stream_key,
-               enum lttng_consumer_stream_state state,
-               const char *channel_name,
-               uid_t uid,
-               gid_t gid,
-               uint64_t relayd_id,
-               uint64_t session_id,
-               int cpu,
-               int *alloc_ret,
-               enum consumer_channel_type type,
-               unsigned int monitor)
-{
-       int ret;
-       struct lttng_consumer_stream *stream;
-
-       stream = zmalloc(sizeof(*stream));
-       if (stream == NULL) {
-               PERROR("malloc struct lttng_consumer_stream");
-               ret = -ENOMEM;
-               goto end;
-       }
-
-       rcu_read_lock();
-
-       stream->key = stream_key;
-       stream->out_fd = -1;
-       stream->out_fd_offset = 0;
-       stream->output_written = 0;
-       stream->state = state;
-       stream->uid = uid;
-       stream->gid = gid;
-       stream->net_seq_idx = relayd_id;
-       stream->session_id = session_id;
-       stream->monitor = monitor;
-       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
-       stream->index_file = NULL;
-       stream->last_sequence_number = -1ULL;
-       pthread_mutex_init(&stream->lock, NULL);
-       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
-
-       /* If channel is the metadata, flag this stream as metadata. */
-       if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
-               stream->metadata_flag = 1;
-               /* Metadata is flat out. */
-               strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
-               /* Live rendez-vous point. */
-               pthread_cond_init(&stream->metadata_rdv, NULL);
-               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
-       } else {
-               /* Format stream name to <channel_name>_<cpu_number> */
-               ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
-                               channel_name, cpu);
-               if (ret < 0) {
-                       PERROR("snprintf stream name");
-                       goto error;
-               }
-       }
-
-       /* Key is always the wait_fd for streams. */
-       lttng_ht_node_init_u64(&stream->node, stream->key);
-
-       /* Init node per channel id key */
-       lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
-
-       /* Init session id node with the stream session id */
-       lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
-
-       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
-                       " relayd_id %" PRIu64 ", session_id %" PRIu64,
-                       stream->name, stream->key, channel_key,
-                       stream->net_seq_idx, stream->session_id);
-
-       rcu_read_unlock();
-       return stream;
-
-error:
-       rcu_read_unlock();
-       free(stream);
-end:
-       if (alloc_ret) {
-               *alloc_ret = ret;
-       }
-       return NULL;
-}
-
 /*
  * Add a stream to the global list protected by a mutex.
  */
@@ -699,7 +611,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
        assert(relayd);
 
        lttng_ht_lookup(consumer_data.relayd_ht,
-                       &relayd->net_seq_idx, &iter);
+                       &relayd->id, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                goto end;
@@ -714,12 +626,12 @@ end:
  * Allocate and return a consumer relayd socket.
  */
 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
-               uint64_t net_seq_idx)
+               uint64_t relayd_id)
 {
        struct consumer_relayd_sock_pair *obj = NULL;
 
        /* net sequence index of -1 is a failure */
-       if (net_seq_idx == (uint64_t) -1ULL) {
+       if (relayd_id == (uint64_t) -1ULL) {
                goto error;
        }
 
@@ -729,12 +641,12 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                goto error;
        }
 
-       obj->net_seq_idx = net_seq_idx;
+       obj->id = relayd_id;
        obj->refcount = 0;
        obj->destroy_flag = 0;
        obj->control_sock.sock.fd = -1;
        obj->data_sock.sock.fd = -1;
-       lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
+       lttng_ht_node_init_u64(&obj->node, obj->id);
        pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
 
 error:
@@ -782,12 +694,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
        struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
-       assert(stream->net_seq_idx != -1ULL);
+       assert(stream->relayd_id != -1ULL);
        assert(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
+       relayd = consumer_find_relayd(stream->relayd_id);
        if (relayd != NULL) {
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -796,6 +708,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                                stream->chan->tracefile_size, stream->chan->tracefile_count);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
 
@@ -803,13 +717,13 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                stream->sent_to_relayd = 1;
        } else {
                ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
-                               stream->key, stream->net_seq_idx);
+                               stream->key, stream->relayd_id);
                ret = -1;
                goto end;
        }
 
        DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
-                       stream->name, stream->key, stream->net_seq_idx);
+                       stream->name, stream->key, stream->relayd_id);
 
 end:
        rcu_read_unlock();
@@ -821,33 +735,35 @@ end:
  *
  * Returns 0 on success, < 0 on error
  */
-int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+int consumer_send_relayd_streams_sent(uint64_t relayd_id)
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
 
-       assert(net_seq_idx != -1ULL);
+       assert(relayd_id != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
-       relayd = consumer_find_relayd(net_seq_idx);
+       relayd = consumer_find_relayd(relayd_id);
        if (relayd != NULL) {
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_streams_sent(&relayd->control_sock);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
        } else {
                ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
-                               net_seq_idx);
+                               relayd_id);
                ret = -1;
                goto end;
        }
 
        ret = 0;
-       DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+       DBG("All streams sent relayd id %" PRIu64, relayd_id);
 
 end:
        rcu_read_unlock();
@@ -863,7 +779,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
 
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
+       relayd = consumer_find_relayd(stream->relayd_id);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
@@ -949,6 +865,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id_per_pid,
                unsigned int monitor,
                unsigned int live_timer_interval,
+               bool is_in_live_session,
                const char *root_shm_path,
                const char *shm_path)
 {
@@ -971,6 +888,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
+       channel->is_live = is_in_live_session;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
@@ -1300,7 +1218,7 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
                ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx),
+                       struct lttng_consumer_local_data *ctx, bool locked_by_caller),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(uint64_t stream_key, uint32_t state))
@@ -1505,65 +1423,34 @@ end:
  * Returns the number of bytes written
  */
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index)
+               struct lttng_consumer_stream *stream,
+               const struct lttng_buffer_view *buffer,
+               unsigned long padding)
 {
-       unsigned long mmap_offset;
-       void *mmap_base;
        ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = NULL;
        unsigned int relayd_hang_up = 0;
+       const size_t subbuf_content_size = buffer->size - padding;
+       size_t write_len;
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
        /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (stream->relayd_id != (uint64_t) -1ULL) {
+               relayd = consumer_find_relayd(stream->relayd_id);
                if (relayd == NULL) {
                        ret = -EPIPE;
                        goto end;
                }
        }
 
-       /* get the offset inside the fd to mmap */
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               mmap_base = stream->mmap_base;
-               ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
-               if (ret < 0) {
-                       PERROR("tracer ctl get_mmap_read_offset");
-                       goto end;
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               mmap_base = lttng_ustctl_get_mmap_base(stream);
-               if (!mmap_base) {
-                       ERR("read mmap get mmap base for stream %s", stream->name);
-                       ret = -EPERM;
-                       goto end;
-               }
-               ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-               if (ret != 0) {
-                       PERROR("tracer ctl get_mmap_read_offset");
-                       ret = -EINVAL;
-                       goto end;
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-       }
-
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
-               unsigned long netlen = len;
+               unsigned long netlen = subbuf_content_size;
 
                /*
                 * Lock the control socket for the complete duration of the function
@@ -1601,10 +1488,10 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                goto write_error;
                        }
                }
-       } else {
-               /* No streaming, we have to set the len with the full padding */
-               len += padding;
 
+               write_len = subbuf_content_size;
+       } else {
+               /* No streaming; we have to write the full padding. */
                if (stream->metadata_flag && stream->reset_metadata_flag) {
                        ret = utils_truncate_stream_file(stream->out_fd, 0);
                        if (ret < 0) {
@@ -1618,7 +1505,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                 * Check if we need to change the tracefile before writing the packet.
                 */
                if (stream->chan->tracefile_size > 0 &&
-                               (stream->tracefile_size_current + len) >
+                               (stream->tracefile_size_current + buffer->size) >
                                stream->chan->tracefile_size) {
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
@@ -1648,19 +1535,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        stream->out_fd_offset = 0;
                        orig_offset = 0;
                }
-               stream->tracefile_size_current += len;
-               if (index) {
-                       index->offset = htobe64(stream->out_fd_offset);
-               }
+               stream->tracefile_size_current += buffer->size;
+               write_len = buffer->size;
        }
 
        /*
         * This call guarantee that len or less is returned. It's impossible to
         * receive a ret value that is bigger than len.
         */
-       ret = lttng_write(outfd, mmap_base + mmap_offset, len);
-       DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-       if (ret < 0 || ((size_t) ret != len)) {
+       ret = lttng_write(outfd, buffer->data, write_len);
+       DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
+       if (ret < 0 || ((size_t) ret != write_len)) {
                /*
                 * Report error to caller if nothing was written else at least send the
                 * amount written.
@@ -1681,7 +1566,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        DBG("Consumer mmap write detected relayd hang up");
                } else {
                        /* Unhandled error, print it and stop function right now. */
-                       PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+                       PERROR("Error in write mmap (ret %zd != write_len %zu)", ret,
+                                       write_len);
                }
                goto write_error;
        }
@@ -1690,9 +1576,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+               lttng_sync_file_range(outfd, stream->out_fd_offset, write_len,
                                SYNC_FILE_RANGE_WRITE);
-               stream->out_fd_offset += len;
+               stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
 
@@ -1702,7 +1588,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->id);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
 end:
@@ -1725,8 +1612,7 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index)
+               unsigned long padding)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1754,8 +1640,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        rcu_read_lock();
 
        /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (stream->relayd_id != (uint64_t) -1ULL) {
+               relayd = consumer_find_relayd(stream->relayd_id);
                if (relayd == NULL) {
                        written = -ret;
                        goto end;
@@ -1851,7 +1737,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        orig_offset = 0;
                }
                stream->tracefile_size_current += len;
-               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -1928,7 +1813,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->id);
+               lttng_consumer_cleanup_relayd(relayd);
                /* Skip splice error so the consumer does not fail */
                goto end;
        }
@@ -2151,7 +2037,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 
        lttng_ht_add_unique_u64(ht, &stream->node);
 
-       lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+       lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
                &stream->node_channel_id);
 
        /*
@@ -2373,7 +2259,7 @@ restart:
                                do {
                                        health_code_update();
 
-                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       len = ctx->on_buffer_ready(stream, ctx, false);
                                        /*
                                         * We don't check the return value here since if we get
                                         * a negative len, it means an error occurred thus we
@@ -2400,7 +2286,7 @@ restart:
                                        do {
                                                health_code_update();
 
-                                               len = ctx->on_buffer_ready(stream, ctx);
+                                               len = ctx->on_buffer_ready(stream, ctx, false);
                                                /*
                                                 * We don't check the return value here since if we get
                                                 * a negative len, it means an error occurred thus we
@@ -2607,7 +2493,7 @@ void *consumer_thread_data_poll(void *data)
                        if (pollfd[i].revents & POLLPRI) {
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
-                               len = ctx->on_buffer_ready(local_stream[i], ctx);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx, false);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
@@ -2638,7 +2524,7 @@ void *consumer_thread_data_poll(void *data)
                                        local_stream[i]->hangup_flush_done ||
                                        local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
-                               len = ctx->on_buffer_ready(local_stream[i], ctx);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx, false);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
@@ -3257,36 +3143,88 @@ error_testpoint:
 }
 
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx,
+               bool locked_by_caller)
 {
-       ssize_t ret;
+       ssize_t ret, written_bytes;
+       struct stream_subbuffer subbuffer = {};
 
-       pthread_mutex_lock(&stream->lock);
-       if (stream->metadata_flag) {
-               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       if (!locked_by_caller) {
+               stream->read_subbuffer_ops.lock(stream);
        }
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               ret = lttng_kconsumer_read_subbuffer(stream, ctx);
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               ret = -ENOSYS;
-               break;
+       if (stream->read_subbuffer_ops.on_wake_up) {
+               ret = stream->read_subbuffer_ops.on_wake_up(stream);
+               if (ret) {
+                       goto end;
+               }
        }
 
-       if (stream->metadata_flag) {
-               pthread_cond_broadcast(&stream->metadata_rdv);
-               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
+       if (ret) {
+               if (ret == -ENODATA) {
+                       /* Not an error. */
+                       ret = 0;
+                       goto sleep_stream;
+               }
+               goto end;
        }
-       pthread_mutex_unlock(&stream->lock);
+
+       ret = stream->read_subbuffer_ops.pre_consume_subbuffer(
+                       stream, &subbuffer);
+       if (ret) {
+               goto error_put_subbuf;
+       }
+
+       written_bytes = stream->read_subbuffer_ops.consume_subbuffer(
+                       ctx, stream, &subbuffer);
+       /*
+        * Should write subbuf_size amount of data when network streaming or
+        * the full padded size when we are not streaming.
+        */
+       if ((written_bytes != subbuffer.info.data.subbuf_size &&
+                           stream->relayd_id != (uint64_t) -1ULL) ||
+                       (written_bytes != subbuffer.info.data.padded_subbuf_size &&
+                                       stream->relayd_id ==
+                                                       (uint64_t) -1ULL)) {
+               /*
+                * Display the error but continue processing to try to
+                * release the subbuffer. This is a DBG statement
+                * since this can happen without being a critical
+                * error.
+                */
+               DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)",
+                               written_bytes, subbuffer.info.data.subbuf_size,
+                               subbuffer.info.data.padded_subbuf_size);
+       }
+
+       ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
+       if (ret) {
+               goto end;
+       }
+
+       if (stream->read_subbuffer_ops.post_consume) {
+               ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
+               if (ret) {
+                       goto end;
+               }
+       }
+
+stream_sleep:
+       if (stream->read_subbuffer_ops.on_sleep) {
+               stream->read_subbuffer_ops.on_sleep(stream, ctx);
+       }
+
+       ret = written_bytes;
+end:
+       if (!locked_by_caller) {
+               stream->read_subbuffer_ops.unlock(stream);
+       }
+
        return ret;
+error_put_subbuf:
+       (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
+       goto end;
 }
 
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
@@ -3351,7 +3289,7 @@ error:
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
-int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+ void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
                struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
@@ -3364,14 +3302,14 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        assert(ctx);
        assert(relayd_sock);
 
-       DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
+       DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", relayd_id);
 
        /* Get relayd reference if exists. */
-       relayd = consumer_find_relayd(net_seq_idx);
+       relayd = consumer_find_relayd(relayd_id);
        if (relayd == NULL) {
                assert(sock_type == LTTNG_STREAM_CONTROL);
                /* Not found. Allocate one. */
-               relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+               relayd = consumer_allocate_relayd_sock_pair(relayd_id);
                if (relayd == NULL) {
                        ret = -ENOMEM;
                        ret_code = LTTCOMM_CONSUMERD_ENOMEM;
@@ -3494,7 +3432,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
 
        DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
-                       relayd->net_seq_idx, fd);
+                       relayd->id, fd);
 
        /* We successfully added the socket. Send status back. */
        ret = consumer_send_status_msg(sock, ret_code);
@@ -3508,10 +3446,11 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
+       relayd->ctx = ctx;
        add_relayd(relayd);
 
        /* All good! */
-       return 0;
+       return;
 
 error:
        if (consumer_send_status_msg(sock, ret_code) < 0) {
@@ -3529,36 +3468,6 @@ error_nosignal:
        if (relayd_created) {
                free(relayd);
        }
-
-       return ret;
-}
-
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-
-       /*
-        * Try to lock the stream mutex. On failure, we know that the stream is
-        * being used else where hence there is data still being extracted.
-        */
-       ret = pthread_mutex_trylock(&stream->lock);
-       if (ret) {
-               /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
-               ret = 0;
-               goto end;
-       }
-
-       ret = 1;
-
-end:
-       return ret;
 }
 
 /*
@@ -3572,7 +3481,7 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
        struct lttng_ht_iter iter;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
-       /* Iterate over all relayd since they are indexed by net_seq_idx. */
+       /* Iterate over all relayd since they are indexed by relayd_id. */
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
                /*
@@ -3627,28 +3536,11 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       relayd = find_relayd_by_session_id(id);
-       if (relayd) {
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_begin_data_pending(&relayd->control_sock,
-                               relayd->relayd_session_id);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       /* Communication error thus the relayd so no data pending. */
-                       goto data_not_pending;
-               }
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&id, lttng_ht_seed),
                        ht->match_fct, &id,
                        &iter.iter, stream, node_session_id.node) {
-               /* If this call fails, the stream is being used hence data pending. */
-               ret = stream_try_lock(stream);
-               if (!ret) {
-                       goto data_pending;
-               }
+               pthread_mutex_lock(&stream->lock);
 
                /*
                 * A removed node from the hash table indicates that the stream has
@@ -3661,14 +3553,35 @@ int consumer_data_pending(uint64_t id)
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
+                               DBG("Data is pending locally on stream %" PRIu64, stream->key);
                                pthread_mutex_unlock(&stream->lock);
                                goto data_pending;
                        }
                }
 
-               /* Relayd check */
-               if (relayd) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               if (ret < 0) {
+                       /* Communication error thus the relayd so no data pending. */
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto data_not_pending;
+               }
+
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                stream->relayd_stream_id);
@@ -3677,27 +3590,29 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_pending;
                        }
+                       if (ret < 0) {
+                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               goto data_not_pending;
+                       }
                }
-               pthread_mutex_unlock(&stream->lock);
-       }
 
-       if (relayd) {
-               unsigned int is_data_inflight = 0;
-
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               /* Send end command for data pending. */
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
                if (is_data_inflight) {
+                       DBG("Data is in flight on relayd %" PRIu64, relayd->id);
                        goto data_pending;
                }
        }
This page took 0.039626 seconds and 5 git commands to generate.