Fix: use "flush empty" ioctl for snapshots
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index b1cc03e860a0e9b10d34a5e8f528f3fcb4d90ae5..b5afc73aa5fb167071145b35e6d8a16a4291cbc6 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
  *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
@@ -62,12 +63,24 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
        ret = kernctl_snapshot(infd);
        if (ret != 0) {
                PERROR("Getting sub-buffer snapshot.");
-               ret = -errno;
        }
 
        return ret;
 }
 
+/*
+ * Sample consumed and produced positions for a specific fd.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_kconsumer_sample_snapshot_positions(
+               struct lttng_consumer_stream *stream)
+{
+       assert(stream);
+
+       return kernctl_snapshot_sample_positions(stream->wait_fd);
+}
+
 /*
  * Get the produced position
  *
@@ -82,7 +95,6 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
                PERROR("kernctl_snapshot_get_produced");
-               ret = -errno;
        }
 
        return ret;
@@ -102,7 +114,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
        ret = kernctl_snapshot_get_consumed(infd, pos);
        if (ret != 0) {
                PERROR("kernctl_snapshot_get_consumed");
-               ret = -errno;
        }
 
        return ret;
@@ -118,7 +129,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       unsigned long consumed_pos, produced_pos;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
 
@@ -143,6 +153,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Are we at a position _before_ the first available packet ? */
                bool before_first_packet = true;
+               unsigned long consumed_pos, produced_pos;
 
                health_code_update();
 
@@ -185,12 +196,23 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                ERR("sending streams sent to relayd");
                                goto end_unlock;
                        }
+                       channel->streams_sent_to_relayd = true;
                }
 
-               ret = kernctl_buffer_flush(stream->wait_fd);
+               ret = kernctl_buffer_flush_empty(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel stream");
-                       ret = -errno;
+                       /*
+                        * Doing a buffer flush which does not take into
+                        * account empty packets. This is not perfect
+                        * for stream intersection, but required as a
+                        * fall-back when "flush_empty" is not
+                        * implemented by lttng-modules.
+                        */
+                       ret = kernctl_buffer_flush(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end_unlock;
+                       }
                        goto end_unlock;
                }
 
@@ -217,7 +239,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                        &stream->max_sb_size);
                        if (ret < 0) {
                                ERR("Getting kernel max_sb_size");
-                               ret = -errno;
                                goto end_unlock;
                        }
                }
@@ -237,9 +258,8 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
                        if (ret < 0) {
-                               if (errno != EAGAIN) {
+                               if (ret != -EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
-                                       ret = -errno;
                                        goto end_unlock;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
@@ -259,14 +279,12 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               ret = -errno;
                                goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               ret = -errno;
                                goto error_put_subbuf;
                        }
 
@@ -292,7 +310,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_put_subbuf(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_put_subbuf");
-                               ret = -errno;
                                goto end_unlock;
                        }
                        consumed_pos += stream->max_sb_size;
@@ -331,7 +348,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 error_put_subbuf:
        ret = kernctl_put_subbuf(stream->wait_fd);
        if (ret < 0) {
-               ret = -errno;
                ERR("Snapshot kernctl_put_subbuf error path");
        }
 end_unlock:
@@ -543,9 +559,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
-               if (CONSUMER_CHANNEL_TYPE_DATA) {
+               if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+                       int monitor_start_ret;
+
+                       DBG("Consumer starting monitor timer");
                        consumer_timer_live_start(new_channel,
                                        msg.u.channel.live_timer_interval);
+                       monitor_start_ret = consumer_timer_monitor_start(
+                                       new_channel,
+                                       msg.u.channel.monitor_timer_interval);
+                       if (monitor_start_ret < 0) {
+                               ERR("Starting channel monitoring timer failed");
+                               goto end_nosignal;
+                       }
+
                }
 
                health_code_update();
@@ -726,6 +753,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
+
+                       /*
+                        * If adding an extra stream to an already
+                        * existing channel (e.g. cpu hotplug), we need
+                        * to send the "streams_sent" command to relayd.
+                        */
+                       if (channel->streams_sent_to_relayd) {
+                               ret = consumer_send_relayd_streams_sent(
+                                               new_stream->net_seq_idx);
+                               if (ret < 0) {
+                                       goto end_nosignal;
+                               }
+                       }
                }
 
                /* Get the right pipe where the stream will be sent. */
@@ -819,6 +859,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        if (ret < 0) {
                                goto end_nosignal;
                        }
+                       channel->streams_sent_to_relayd = true;
                }
                break;
        }
@@ -1022,6 +1063,55 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                break;
        }
+       case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+       {
+               int channel_monitor_pipe;
+
+               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+               /* Successfully received the command's type. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+
+               ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+                               1);
+               if (ret != sizeof(channel_monitor_pipe)) {
+                       ERR("Failed to receive channel monitor pipe");
+                       goto error_fatal;
+               }
+
+               DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+               ret = consumer_timer_thread_set_channel_monitor_pipe(
+                               channel_monitor_pipe);
+               if (!ret) {
+                       int flags;
+
+                       ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+                       /* Set the pipe as non-blocking. */
+                       ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+                       if (ret == -1) {
+                               PERROR("fcntl get flags of the channel monitoring pipe");
+                               goto error_fatal;
+                       }
+                       flags = ret;
+
+                       ret = fcntl(channel_monitor_pipe, F_SETFL,
+                                       flags | O_NONBLOCK);
+                       if (ret == -1) {
+                               PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+                               goto error_fatal;
+                       }
+                       DBG("Channel monitor pipe set as non-blocking");
+               } else {
+                       ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+               }
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+               break;
+       }
        default:
                goto end_nosignal;
        }
@@ -1095,15 +1185,27 @@ static int get_index_values(struct ctf_packet_index *index, int infd)
 
        ret = kernctl_get_instance_id(infd, &index->stream_instance_id);
        if (ret < 0) {
-               PERROR("kernctl_get_instance_id");
-               goto error;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       index->stream_instance_id = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_instance_id");
+                       goto error;
+               }
        }
        index->stream_instance_id = htobe64(index->stream_instance_id);
 
        ret = kernctl_get_sequence_number(infd, &index->packet_seq_num);
        if (ret < 0) {
-               PERROR("kernctl_get_sequence_number");
-               goto error;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       index->packet_seq_num = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_sequence_number");
+                       goto error;
+               }
        }
        index->packet_seq_num = htobe64(index->packet_seq_num);
 
@@ -1133,7 +1235,7 @@ int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
 
        ret = kernctl_snapshot(metadata->wait_fd);
        if (ret < 0) {
-               if (errno != EAGAIN) {
+               if (ret != -EAGAIN) {
                        ERR("Sync metadata, taking kernel snapshot failed.");
                        goto end;
                }
@@ -1155,8 +1257,14 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
 
        ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
        if (ret < 0) {
-               PERROR("kernctl_get_sequence_number");
-               goto end;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       seq = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_sequence_number");
+                       goto end;
+               }
        }
 
        /*
@@ -1215,6 +1323,14 @@ int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream
 
        ret = kernctl_get_metadata_version(infd, &cur_version);
        if (ret < 0) {
+               if (ret == -ENOTTY) {
+                       /*
+                        * LTTng-modules does not implement this
+                        * command.
+                        */
+                       ret = 0;
+                       goto end;
+               }
                ERR("Failed to get the metadata version");
                goto end;
        }
@@ -1258,7 +1374,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
-               ret = -errno;
+               ret = err;
                goto end;
        }
 
@@ -1268,16 +1384,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                PERROR("Getting sub-buffer len failed.");
                err = kernctl_put_subbuf(infd);
                if (err != 0) {
-                       if (errno == EFAULT) {
+                       if (err == -EFAULT) {
                                PERROR("Error in unreserving sub buffer\n");
-                       } else if (errno == EIO) {
+                       } else if (err == -EIO) {
                                /* Should never happen with newer LTTng versions */
                                PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                        }
-                       ret = -errno;
+                       ret = err;
                        goto end;
                }
-               ret = -errno;
+               ret = err;
                goto end;
        }
 
@@ -1286,25 +1402,47 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                if (ret < 0) {
                        err = kernctl_put_subbuf(infd);
                        if (err != 0) {
-                               if (errno == EFAULT) {
+                               if (err == -EFAULT) {
                                        PERROR("Error in unreserving sub buffer\n");
-                               } else if (errno == EIO) {
+                               } else if (err == -EIO) {
                                        /* Should never happen with newer LTTng versions */
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
-                               ret = -errno;
+                               ret = err;
                                goto end;
                        }
                        goto end;
                }
                ret = update_stream_stats(stream);
                if (ret < 0) {
+                       err = kernctl_put_subbuf(infd);
+                       if (err != 0) {
+                               if (err == -EFAULT) {
+                                       PERROR("Error in unreserving sub buffer\n");
+                               } else if (err == -EIO) {
+                                       /* Should never happen with newer LTTng versions */
+                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+                               }
+                               ret = err;
+                               goto end;
+                       }
                        goto end;
                }
        } else {
                write_index = 0;
                ret = metadata_stream_check_version(infd, stream);
                if (ret < 0) {
+                       err = kernctl_put_subbuf(infd);
+                       if (err != 0) {
+                               if (err == -EFAULT) {
+                                       PERROR("Error in unreserving sub buffer\n");
+                               } else if (err == -EIO) {
+                                       /* Should never happen with newer LTTng versions */
+                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+                               }
+                               ret = err;
+                               goto end;
+                       }
                        goto end;
                }
        }
@@ -1343,16 +1481,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        PERROR("Getting sub-buffer len failed.");
                        err = kernctl_put_subbuf(infd);
                        if (err != 0) {
-                               if (errno == EFAULT) {
+                               if (err == -EFAULT) {
                                        PERROR("Error in unreserving sub buffer\n");
-                               } else if (errno == EIO) {
+                               } else if (err == -EIO) {
                                        /* Should never happen with newer LTTng versions */
                                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                                }
-                               ret = -errno;
+                               ret = err;
                                goto end;
                        }
-                       ret = -errno;
+                       ret = err;
                        goto end;
                }
 
@@ -1389,13 +1527,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
        err = kernctl_put_next_subbuf(infd);
        if (err != 0) {
-               if (errno == EFAULT) {
+               if (err == -EFAULT) {
                        PERROR("Error in unreserving sub buffer\n");
-               } else if (errno == EIO) {
+               } else if (err == -EIO) {
                        /* Should never happen with newer LTTng versions */
                        PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
-               ret = -errno;
+               ret = err;
                goto end;
        }
 
@@ -1459,14 +1597,18 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->tracefile_size_current = 0;
 
                if (!stream->metadata_flag) {
-                       ret = index_create_file(stream->chan->pathname,
+                       struct lttng_index_file *index_file;
+
+                       index_file = lttng_index_file_create(stream->chan->pathname,
                                        stream->name, stream->uid, stream->gid,
                                        stream->chan->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
+                                       stream->tracefile_count_current,
+                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                       if (!index_file) {
                                goto error;
                        }
-                       stream->index_fd = ret;
+                       assert(!stream->index_file);
+                       stream->index_file = index_file;
                }
        }
 
@@ -1477,7 +1619,6 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
                if (ret != 0) {
                        PERROR("kernctl_get_mmap_len");
-                       ret = -errno;
                        goto error_close_fd;
                }
                stream->mmap_len = (size_t) mmap_len;
This page took 0.030736 seconds and 5 git commands to generate.