Fix: relayd vs consumerd compatibility
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 15 Dec 2016 10:04:57 +0000 (11:04 +0100)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 16 Dec 2016 08:09:32 +0000 (03:09 -0500)
relay and consumerd 2.7 and 2.8 are expected to negociate compatibility
with the lowest common minor version.

If a consumer daemon 2.8 interacts with a relayd 2.7, it needs to send
the index fields for ctf index 1.0. Same if a relayd 2.8 interacts with
a consumer daemon 2.7: relayd should expect ctf index 1.0 fields, and
generate a ctf index 1.0 index file layout.

If both relayd and consumerd versions are 2.8+, then we can send the ctf
index 1.1 fields over the protocol, and store them in the index files.

Whenever the relayd live viewer server opens and reads an index file,
it needs to use the file's header to figure out the index "element"
size.

[ Should be applied to master, stable-2.9, stable-2.8. ]

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
19 files changed:
src/bin/lttng-relayd/index.c
src/bin/lttng-relayd/index.h
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/bin/lttng-relayd/viewer-stream.c
src/bin/lttng-relayd/viewer-stream.h
src/common/consumer/consumer-stream.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/index/ctf-index.h
src/common/index/index.c
src/common/index/index.h
src/common/kernel-consumer/kernel-consumer.c
src/common/macros.h
src/common/relayd/relayd.c
src/common/sessiond-comm/relayd.h
src/common/ust-consumer/ust-consumer.c

index 80a4bb94d2a0651dc24100425c52ca4047b06209..b15bbcd7702954e675a98c06e02ab31058ff7499 100644 (file)
@@ -166,18 +166,19 @@ end:
        return index;
 }
 
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+int relay_index_set_file(struct relay_index *index,
+               struct lttng_index_file *index_file,
                uint64_t data_offset)
 {
        int ret = 0;
 
        pthread_mutex_lock(&index->lock);
-       if (index->index_fd) {
+       if (index->index_file) {
                ret = -1;
                goto end;
        }
-       stream_fd_get(index_fd);
-       index->index_fd = index_fd;
+       lttng_index_file_get(index_file);
+       index->index_file = index_file;
        index->index_data.offset = data_offset;
 end:
        pthread_mutex_unlock(&index->lock);
@@ -228,9 +229,9 @@ static void index_release(struct urcu_ref *ref)
        int ret;
        struct lttng_ht_iter iter;
 
-       if (index->index_fd) {
-               stream_fd_put(index->index_fd);
-               index->index_fd = NULL;
+       if (index->index_file) {
+               lttng_index_file_put(index->index_file);
+               index->index_file = NULL;
        }
        if (index->in_hash_table) {
                /* Delete index from hash table. */
@@ -290,21 +291,16 @@ int relay_index_try_flush(struct relay_index *index)
                goto skip;
        }
        /* Check if we are ready to flush. */
-       if (!index->has_index_data || !index->index_fd) {
+       if (!index->has_index_data || !index->index_file) {
                goto skip;
        }
-       fd = index->index_fd->fd;
+       fd = index->index_file->fd;
        DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
                        " on fd %d", index->stream->stream_handle,
                        index->index_n.key, fd);
        flushed = true;
        index->flushed = true;
-       ret = index_write(fd, &index->index_data, sizeof(index->index_data));
-       if (ret == sizeof(index->index_data)) {
-               ret = 0;
-       } else {
-               ret = -1;
-       }
+       ret = lttng_index_file_write(index->index_file, &index->index_data);
 skip:
        pthread_mutex_unlock(&index->lock);
 
@@ -341,11 +337,11 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
        rcu_read_lock();
        cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
                        index, index_n.node) {
-               if (!index->index_fd) {
+               if (!index->index_file) {
                        continue;
                }
                /*
-                * Partial index has its index_fd: we have only
+                * Partial index has its index_file: we have only
                 * received its info from the data socket.
                 * Put self-ref from index.
                 */
index 15c4ac8cd8201b906e458a4e819416004a9a1d1e..80fe86ab024f8e668e51a1553d52385e1c666ad2 100644 (file)
@@ -40,10 +40,10 @@ struct relay_index {
 
        pthread_mutex_t lock;
        /*
-        * FD on which to write the index data. May differ from
-        * stream->index_fd due to tracefile rotation.
+        * index file on which to write the index data. May differ from
+        * stream->index_file due to tracefile rotation.
         */
-       struct stream_fd *index_fd;
+       struct lttng_index_file *index_file;
 
        /* Index packet data. This is the data that is written on disk. */
        struct ctf_packet_index index_data;
@@ -64,8 +64,9 @@ struct relay_index {
 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
                uint64_t net_seq_num);
 void relay_index_put(struct relay_index *index);
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
-                uint64_t data_offset);
+int relay_index_set_file(struct relay_index *index,
+               struct lttng_index_file *index_file,
+               uint64_t data_offset);
 int relay_index_set_data(struct relay_index *index,
                 const struct ctf_packet_index *data);
 int relay_index_try_flush(struct relay_index *index);
index 2c3a10574b5499c6a4a8585f3a22e5eac05562bc..9e1085f33615018237a4acd2abd434c2362c265b 100644 (file)
@@ -1122,8 +1122,8 @@ error:
 /*
  * Open the index file if needed for the given vstream.
  *
- * If an index file is successfully opened, the vstream index_fd set with
- * it.
+ * If an index file is successfully opened, the vstream will set it as its
+ * current index file.
  *
  * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
  *
@@ -1134,7 +1134,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
 {
        int ret = 0;
 
-       if (vstream->index_fd) {
+       if (vstream->index_file) {
                goto end;
        }
 
@@ -1145,20 +1145,12 @@ static int try_open_index(struct relay_viewer_stream *vstream,
                ret = -ENOENT;
                goto end;
        }
-       ret = index_open(vstream->path_name, vstream->channel_name,
+       vstream->index_file = lttng_index_file_open(vstream->path_name,
+                       vstream->channel_name,
                        vstream->stream->tracefile_count,
                        vstream->current_tracefile_id);
-       if (ret >= 0) {
-               vstream->index_fd = stream_fd_create(ret);
-               if (!vstream->index_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
-                       }
-                       ret = -1;
-               } else {
-                       ret = 0;
-               }
-               goto end;
+       if (!vstream->index_file) {
+               ret = -1;
        }
 
 end:
@@ -1277,7 +1269,6 @@ static
 int viewer_get_next_index(struct relay_connection *conn)
 {
        int ret;
-       ssize_t read_ret;
        struct lttng_viewer_get_next_index request_index;
        struct lttng_viewer_index viewer_index;
        struct ctf_packet_index packet_index;
@@ -1400,11 +1391,10 @@ int viewer_get_next_index(struct relay_connection *conn)
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
        }
 
-       read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
-                       sizeof(packet_index));
-       if (read_ret < sizeof(packet_index)) {
-               ERR("Relay reading index file %d returned %zd",
-                       vstream->index_fd->fd, read_ret);
+       ret = lttng_index_file_read(vstream->index_file, &packet_index);
+       if (ret) {
+               ERR("Relay error reading index file %d",
+                               vstream->index_file->fd);
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        } else {
index 9c0e2b1e37158d728ace4bae91a21a70d9d80a93..dc19a69c77a548df81fffc2aa7e44affd12279b4 100644 (file)
@@ -1946,6 +1946,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        struct lttcomm_relayd_generic_reply reply;
        struct relay_stream *stream;
        uint64_t net_seq_num;
+       size_t msg_len;
 
        assert(conn);
 
@@ -1957,9 +1958,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                goto end_no_session;
        }
 
+       msg_len = lttcomm_relayd_index_len(
+                       lttng_to_index_major(conn->major, conn->minor),
+                       lttng_to_index_minor(conn->major, conn->minor));
        ret = conn->sock->ops->recvmsg(conn->sock, &index_info,
-                       sizeof(index_info), 0);
-       if (ret < sizeof(index_info)) {
+                       msg_len, 0);
+       if (ret < msg_len) {
                if (ret == 0) {
                        /* Orderly shutdown. Not necessary to print an error. */
                        DBG("Socket %d did an orderly shutdown", conn->sock->fd);
@@ -2183,41 +2187,36 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                goto end;
        }
 
-       if (rotate_index || !stream->index_fd) {
-               int fd;
+       if (rotate_index || !stream->index_file) {
+               uint32_t major, minor;
 
-               /* Put ref on previous index_fd. */
-               if (stream->index_fd) {
-                       stream_fd_put(stream->index_fd);
-                       stream->index_fd = NULL;
+               /* Put ref on previous index_file. */
+               if (stream->index_file) {
+                       lttng_index_file_put(stream->index_file);
+                       stream->index_file = NULL;
                }
-
-               fd = index_create_file(stream->path_name, stream->channel_name,
+               major = stream->trace->session->major;
+               minor = stream->trace->session->minor;
+               stream->index_file = lttng_index_file_create(stream->path_name,
+                               stream->channel_name,
                                -1, -1, stream->tracefile_size,
-                               tracefile_array_get_file_index_head(stream->tfa));
-               if (fd < 0) {
+                               tracefile_array_get_file_index_head(stream->tfa),
+                               lttng_to_index_major(major, minor),
+                               lttng_to_index_minor(major, minor));
+               if (!stream->index_file) {
                        ret = -1;
                        /* Put self-ref for this index due to error. */
                        relay_index_put(index);
-                       goto end;
-               }
-               stream->index_fd = stream_fd_create(fd);
-               if (!stream->index_fd) {
-                       ret = -1;
-                       if (close(fd)) {
-                               PERROR("Error closing FD %d", fd);
-                       }
-                       /* Put self-ref for this index due to error. */
-                       relay_index_put(index);
-                       /* Will put the local ref. */
+                       index = NULL;
                        goto end;
                }
        }
 
-       if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
+       if (relay_index_set_file(index, stream->index_file, data_offset)) {
                ret = -1;
                /* Put self-ref for this index due to error. */
                relay_index_put(index);
+               index = NULL;
                goto end;
        }
 
@@ -2231,6 +2230,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        } else {
                /* Put self-ref for this index due to error. */
                relay_index_put(index);
+               index = NULL;
                ret = -1;
        }
 end:
index 335a1cf5f79de497f3002beb3ae0a2a983da355f..c59bb9416b1c9609ca34884cbc6126daf1295b78 100644 (file)
@@ -306,9 +306,9 @@ static void stream_release(struct urcu_ref *ref)
                stream_fd_put(stream->stream_fd);
                stream->stream_fd = NULL;
        }
-       if (stream->index_fd) {
-               stream_fd_put(stream->index_fd);
-               stream->index_fd = NULL;
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
        }
        if (stream->trace) {
                ctf_trace_put(stream->trace);
index 5030e5d4c47e03291b22102aaef1b50a093bd33a..d471c7b7f49561c7c1882b5c584822daf037fcb5 100644 (file)
@@ -58,8 +58,8 @@ struct relay_stream {
 
        /* FD on which to write the stream data. */
        struct stream_fd *stream_fd;
-       /* FD on which to write the index data. */
-       struct stream_fd *index_fd;
+       /* index file on which to write the index data. */
+       struct lttng_index_file *index_file;
 
        char *path_name;
        char *channel_name;
index 7c59cd05d88ea3797569e87d8d964d67aa61e562..8a3b09a92060ef25e9f9f8f50b2d36d15c2de159 100644 (file)
@@ -116,29 +116,21 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
         * the opening of the index, otherwise open it right now.
         */
        if (stream->index_received_seqcount == 0) {
-               vstream->index_fd = NULL;
+               vstream->index_file = NULL;
        } else {
-               int read_fd;
-
-               read_fd = index_open(vstream->path_name, vstream->channel_name,
+               vstream->index_file = lttng_index_file_open(vstream->path_name,
+                               vstream->channel_name,
                                stream->tracefile_count,
                                vstream->current_tracefile_id);
-               if (read_fd < 0) {
-                       goto error_unlock;
-               }
-               vstream->index_fd = stream_fd_create(read_fd);
-               if (!vstream->index_fd) {
-                       if (close(read_fd)) {
-                               PERROR("close");
-                       }
+               if (!vstream->index_file) {
                        goto error_unlock;
                }
        }
 
-       if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
+       if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
                off_t lseek_ret;
 
-               lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
+               lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END);
                if (lseek_ret < 0) {
                        goto error_unlock;
                }
@@ -192,9 +184,9 @@ static void viewer_stream_release(struct urcu_ref *ref)
                stream_fd_put(vstream->stream_fd);
                vstream->stream_fd = NULL;
        }
-       if (vstream->index_fd) {
-               stream_fd_put(vstream->index_fd);
-               vstream->index_fd = NULL;
+       if (vstream->index_file) {
+               lttng_index_file_put(vstream->index_file);
+               vstream->index_file = NULL;
        }
        if (vstream->stream) {
                stream_put(vstream->stream);
@@ -305,29 +297,24 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
                vstream->index_sent_seqcount = seq_tail;
        }
 
-       if (vstream->index_fd) {
-               stream_fd_put(vstream->index_fd);
-               vstream->index_fd = NULL;
+       if (vstream->index_file) {
+               lttng_index_file_put(vstream->index_file);
+               vstream->index_file = NULL;
        }
        if (vstream->stream_fd) {
                stream_fd_put(vstream->stream_fd);
                vstream->stream_fd = NULL;
        }
 
-       ret = index_open(vstream->path_name, vstream->channel_name,
+       vstream->index_file = lttng_index_file_open(vstream->path_name,
+                       vstream->channel_name,
                        stream->tracefile_count,
                        vstream->current_tracefile_id);
-       if (ret < 0) {
+       if (!vstream->index_file) {
+               ret = -1;
                goto end;
-       }
-       vstream->index_fd = stream_fd_create(ret);
-       if (vstream->index_fd) {
-               ret = 0;
        } else {
-               if (close(ret)) {
-                       PERROR("close");
-               }
-               ret = -1;
+               ret = 0;
        }
 end:
        return ret;
index 5dc135dc6c4972eb104aec630ca088cbd8ed62fc..2514b172214ac02d6929042180cc15b7215e94b9 100644 (file)
@@ -52,8 +52,8 @@ struct relay_viewer_stream {
 
        /* FD from which to read the stream data. */
        struct stream_fd *stream_fd;
-       /* FD from which to read the index data. */
-       struct stream_fd *index_fd;
+       /* index file from which to read the index data. */
+       struct lttng_index_file *index_file;
 
        char *path_name;
        char *channel_name;
index a62cef272294d360953e5d685fca745d7509bcff..522b3cd5cd8ac0c45876c12dd7e344c8c7c27118 100644 (file)
@@ -163,12 +163,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                stream->out_fd = -1;
        }
 
-       if (stream->index_fd >= 0) {
-               ret = close(stream->index_fd);
-               if (ret) {
-                       PERROR("close stream index_fd");
-               }
-               stream->index_fd = -1;
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
        }
 
        /* Check and cleanup relayd if needed. */
@@ -359,27 +356,23 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
  * Return 0 on success or else a negative value.
  */
 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
-               struct ctf_packet_index *index)
+               struct ctf_packet_index *element)
 {
        int ret;
        struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
-       assert(index);
+       assert(element);
 
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_index(&relayd->control_sock, index,
+               ret = relayd_send_index(&relayd->control_sock, element,
                                stream->relayd_stream_id, stream->next_net_seq_num - 1);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        } else {
-               ssize_t size_ret;
-
-               size_ret = index_write(stream->index_fd, index,
-                               sizeof(struct ctf_packet_index));
-               if (size_ret < sizeof(struct ctf_packet_index)) {
+               if (lttng_index_file_write(stream->index_file, element)) {
                        ret = -1;
                } else {
                        ret = 0;
index a184afd03d24a32406a5b145fce4690ff598feb0..929be6553a7ea76dc54694cb149d988b448efa3b 100644 (file)
@@ -570,7 +570,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->session_id = session_id;
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
-       stream->index_fd = -1;
+       stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
@@ -1624,21 +1624,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = close(stream->index_fd);
-                               if (ret < 0) {
-                                       PERROR("Closing index");
-                                       goto end;
-                               }
-                               stream->index_fd = -1;
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1831,22 +1826,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = close(stream->index_fd);
-                               if (ret < 0) {
-                                       PERROR("Closing index");
-                                       goto end;
-                               }
-                               stream->index_fd = -1;
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
-                                       written = ret;
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
index 256ea1976016feffcf805d5d5bd0df3d8598a2bf..acefacb218e3b8316011dff9f7c289bcda9428bd 100644 (file)
@@ -390,9 +390,9 @@ struct lttng_consumer_stream {
        /* Copy of the sequence number of the last packet extracted. */
        uint64_t last_sequence_number;
        /*
-        * FD of the index file for this stream.
+        * Index file object of the index file for this stream.
         */
-       int index_fd;
+       struct lttng_index_file *index_file;
 
        /*
         * Local pipe to extract data when using splice.
index 8755f1218ad3d114ac8f464051de42d7f92d23af..5d6bd80f4cf11b32ed81574141e4ed7c5854c33a 100644 (file)
@@ -60,4 +60,45 @@ struct ctf_packet_index {
        uint64_t packet_seq_num;        /* packet sequence number */
 } __attribute__((__packed__));
 
+static inline size_t ctf_packet_index_len(uint32_t major, uint32_t minor)
+{
+       if (major == 1) {
+               switch (minor) {
+               case 0:
+                       return offsetof(struct ctf_packet_index, stream_id)
+                               + member_sizeof(struct ctf_packet_index,
+                                               stream_id);
+               case 1:
+                       return offsetof(struct ctf_packet_index, packet_seq_num)
+                               + member_sizeof(struct ctf_packet_index,
+                                               packet_seq_num);
+               default:
+                       abort();
+               }
+       }
+       abort();
+}
+
+static inline uint32_t lttng_to_index_major(uint32_t lttng_major,
+               uint32_t lttng_minor)
+{
+       if (lttng_major == 2) {
+               return 1;
+       }
+       abort();
+}
+
+static inline uint32_t lttng_to_index_minor(uint32_t lttng_major,
+               uint32_t lttng_minor)
+{
+       if (lttng_major == 2) {
+               if (lttng_minor < 8) {
+                       return 0;
+               } else {
+                       return 1;
+               }
+       }
+       abort();
+}
+
 #endif /* LTTNG_INDEX_H */
index 066618e8bd1a70ea3a7bb7ebe9e2b4b76ece1cc1..b5591d137c923a97617005f9a85145e09264cba4 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
  *                      David Goulet <dgoulet@efficios.com>
+ *               2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 /*
  * Create the index file associated with a trace file.
  *
- * Return fd on success, a negative value on error.
+ * Return allocated struct lttng_index_file, NULL on error.
  */
-int index_create_file(char *path_name, char *stream_name, int uid, int gid,
-               uint64_t size, uint64_t count)
+struct lttng_index_file *lttng_index_file_create(char *path_name,
+               char *stream_name, int uid, int gid,
+               uint64_t size, uint64_t count, uint32_t major, uint32_t minor)
 {
+       struct lttng_index_file *index_file;
        int ret, fd = -1;
        ssize_t size_ret;
        struct ctf_packet_index_file_hdr hdr;
        char fullpath[PATH_MAX];
+       uint32_t element_len = ctf_packet_index_len(major, minor);
+
+       index_file = zmalloc(sizeof(*index_file));
+       if (!index_file) {
+               PERROR("allocating lttng_index_file");
+               goto error;
+       }
 
        ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR,
                        path_name);
@@ -79,9 +89,9 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
        fd = ret;
 
        hdr.magic = htobe32(CTF_INDEX_MAGIC);
-       hdr.index_major = htobe32(CTF_INDEX_MAJOR);
-       hdr.index_minor = htobe32(CTF_INDEX_MINOR);
-       hdr.packet_index_len = htobe32(sizeof(struct ctf_packet_index));
+       hdr.index_major = htobe32(major);
+       hdr.index_minor = htobe32(minor);
+       hdr.packet_index_len = htobe32(element_len);
 
        size_ret = lttng_write(fd, &hdr, sizeof(hdr));
        if (size_ret < sizeof(hdr)) {
@@ -89,8 +99,13 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
                ret = -1;
                goto error;
        }
+       index_file->fd = fd;
+       index_file->major = major;
+       index_file->minor = minor;
+       index_file->element_len = element_len;
+       urcu_ref_init(&index_file->ref);
 
-       return fd;
+       return index_file;
 
 error:
        if (fd >= 0) {
@@ -101,51 +116,93 @@ error:
                        PERROR("close index fd");
                }
        }
-       return ret;
+       free(index_file);
+       return NULL;
 }
 
 /*
- * Write index values to the given fd of size len.
+ * Write index values to the given index file.
  *
- * Return "len" on success or else < len on error. errno contains error
- * details.
+ * Return 0 on success, -1 on error.
  */
-ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len)
+int lttng_index_file_write(const struct lttng_index_file *index_file,
+               const struct ctf_packet_index *element)
 {
        ssize_t ret;
+       int fd = index_file->fd;
+       size_t len = index_file->element_len;
 
-       assert(index);
+       assert(element);
 
        if (fd < 0) {
-               ret = -EINVAL;
                goto error;
        }
 
-       ret = lttng_write(fd, index, len);
+       ret = lttng_write(fd, element, len);
        if (ret < len) {
                PERROR("writing index file");
+               goto error;
+       }
+       return 0;
+
+error:
+       return -1;
+}
+
+/*
+ * Read index values from the given index file.
+ *
+ * Return 0 on success, -1 on error.
+ */
+int lttng_index_file_read(const struct lttng_index_file *index_file,
+               struct ctf_packet_index *element)
+{
+       ssize_t ret;
+       int fd = index_file->fd;
+       size_t len = index_file->element_len;
+
+       assert(element);
+
+       if (fd < 0) {
+               goto error;
+       }
+
+       ret = lttng_read(fd, element, len);
+       if (ret < len) {
+               PERROR("read index file");
+               goto error;
        }
+       return 0;
 
 error:
-       return ret;
+       return -1;
 }
 
 /*
  * Open index file using a given path, channel name and tracefile count.
  *
- * Return read only FD on success or else a negative value.
+ * Return allocated struct lttng_index_file, NULL on error.
  */
-int index_open(const char *path_name, const char *channel_name,
-               uint64_t tracefile_count, uint64_t tracefile_count_current)
+struct lttng_index_file *lttng_index_file_open(const char *path_name,
+               const char *channel_name, uint64_t tracefile_count,
+               uint64_t tracefile_count_current)
 {
+       struct lttng_index_file *index_file;
        int ret, read_fd;
        ssize_t read_len;
        char fullpath[PATH_MAX];
        struct ctf_packet_index_file_hdr hdr;
+       uint32_t major, minor, element_len;
 
        assert(path_name);
        assert(channel_name);
 
+       index_file = zmalloc(sizeof(*index_file));
+       if (!index_file) {
+               PERROR("allocating lttng_index_file");
+               goto error;
+       }
+
        if (tracefile_count > 0) {
                ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
                                PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name,
@@ -180,13 +237,22 @@ int index_open(const char *path_name, const char *channel_name,
                ERR("Invalid header magic");
                goto error_close;
        }
-       if (be32toh(hdr.index_major) != CTF_INDEX_MAJOR ||
-                       be32toh(hdr.index_minor) != CTF_INDEX_MINOR) {
+       major = be32toh(hdr.index_major);
+       minor = be32toh(hdr.index_minor);
+       element_len = be32toh(hdr.packet_index_len);
+
+       if (major != CTF_INDEX_MAJOR) {
                ERR("Invalid header version");
                goto error_close;
        }
 
-       return read_fd;
+       index_file->fd = read_fd;
+       index_file->major = major;
+       index_file->minor = minor;
+       index_file->element_len = element_len;
+       urcu_ref_init(&index_file->ref);
+
+       return index_file;
 
 error_close:
        if (read_fd >= 0) {
@@ -200,5 +266,27 @@ error_close:
        ret = -1;
 
 error:
-       return ret;
+       free(index_file);
+       return NULL;
+}
+
+void lttng_index_file_get(struct lttng_index_file *index_file)
+{
+       urcu_ref_get(&index_file->ref);
+}
+
+static void lttng_index_file_release(struct urcu_ref *ref)
+{
+       struct lttng_index_file *index_file = caa_container_of(ref,
+                       struct lttng_index_file, ref);
+
+       if (close(index_file->fd)) {
+               PERROR("close index fd");
+       }
+       free(index_file);
+}
+
+void lttng_index_file_put(struct lttng_index_file *index_file)
+{
+       urcu_ref_put(&index_file->ref, lttng_index_file_release);
 }
index a7e6aee07a046a2b7b097f76f0a920dcf2bc1ac8..7020936b646b33142f3d2021d5f540bac08f29dd 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
  *                      David Goulet <dgoulet@efficios.com>
+ *               2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 #define _INDEX_H
 
 #include <inttypes.h>
+#include <urcu/ref.h>
 
 #include "ctf-index.h"
 
-int index_create_file(char *path_name, char *stream_name, int uid, int gid,
-               uint64_t size, uint64_t count);
-ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len);
-int index_open(const char *path_name, const char *channel_name,
-               uint64_t tracefile_count, uint64_t tracefile_count_current);
+struct lttng_index_file {
+       int fd;
+       uint32_t major;
+       uint32_t minor;
+       uint32_t element_len;
+       struct urcu_ref ref;
+};
+
+/*
+ * create and open have refcount of 1. Use put to decrement the
+ * refcount. Destroys when reaching 0. Use "get" to increment refcount.
+ */
+struct lttng_index_file *lttng_index_file_create(char *path_name,
+               char *stream_name, int uid, int gid, uint64_t size,
+               uint64_t count, uint32_t major, uint32_t minor);
+struct lttng_index_file *lttng_index_file_open(const char *path_name,
+               const char *channel_name, uint64_t tracefile_count,
+               uint64_t tracefile_count_current);
+int lttng_index_file_write(const struct lttng_index_file *index_file,
+               const struct ctf_packet_index *element);
+int lttng_index_file_read(const struct lttng_index_file *index_file,
+               struct ctf_packet_index *element);
+
+void lttng_index_file_get(struct lttng_index_file *index_file);
+void lttng_index_file_put(struct lttng_index_file *index_file);
 
 #endif /* _INDEX_H */
index a8abcd7901121c62dc27ec94f7d01cee62d2a6fa..d03ff701533fdab938082a1d551efdeff4140428 100644 (file)
@@ -1475,14 +1475,17 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->tracefile_size_current = 0;
 
                if (!stream->metadata_flag) {
-                       ret = index_create_file(stream->chan->pathname,
+                       struct lttng_index_file *index_file;
+
+                       index_file = lttng_index_file_create(stream->chan->pathname,
                                        stream->name, stream->uid, stream->gid,
                                        stream->chan->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
+                                       stream->tracefile_count_current,
+                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                       if (!index_file) {
                                goto error;
                        }
-                       stream->index_fd = ret;
+                       stream->index_file = index_file;
                }
        }
 
index 0e11d3c7ac2f8179ea98bda74293c91158d6b2c2..8ae6535d0de57ed1878bf42b98d37163fadccb49 100644 (file)
@@ -78,6 +78,8 @@ void *zmalloc(size_t len)
 #define LTTNG_HIDDEN __attribute__((visibility("hidden")))
 #endif
 
+#define member_sizeof(type, field)     sizeof(((type *) 0)->field)
+
 /*
  * lttng_strncpy returns 0 on success, or nonzero on failure.
  * It checks that the @src string fits into @dst_len before performing
index 7f0ea74e94085b99873049a736941015e85fda5c..2adcbe415c63f16a176dba9167c247f34eb0eba3 100644 (file)
@@ -856,7 +856,11 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
        }
 
        /* Send command */
-       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
+               lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
+                                                               rsock->minor),
+                               lttng_to_index_minor(rsock->major, rsock->minor)),
+                               0);
        if (ret < 0) {
                goto error;
        }
index 1fc48c442b4e52b68391683fd3a3f392ce3eb038..eb3e5f118bfdadbf0af3cfc7223d1df2792e182f 100644 (file)
@@ -159,10 +159,30 @@ struct lttcomm_relayd_index {
        uint64_t timestamp_end;
        uint64_t events_discarded;
        uint64_t stream_id;
+       /* 2.8+ */
        uint64_t stream_instance_id;
        uint64_t packet_seq_num;
 } LTTNG_PACKED;
 
+static inline size_t lttcomm_relayd_index_len(uint32_t major, uint32_t minor)
+{
+       if (major == 1) {
+               switch (minor) {
+               case 0:
+                       return offsetof(struct lttcomm_relayd_index, stream_id)
+                               + member_sizeof(struct lttcomm_relayd_index,
+                                               stream_id);
+               case 1:
+                       return offsetof(struct lttcomm_relayd_index, packet_seq_num)
+                               + member_sizeof(struct lttcomm_relayd_index,
+                                               packet_seq_num);
+               default:
+                       abort();
+               }
+       }
+       abort();
+}
+
 /*
  * Create session in 2.4 adds additionnal parameters for live reading.
  */
index 76eee2ad9f56d448a9f5c9913cd86685b0279e8c..82d56ebb796910b28b86f417f7ec0f64a08de42b 100644 (file)
@@ -2620,14 +2620,17 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->tracefile_size_current = 0;
 
                if (!stream->metadata_flag) {
-                       ret = index_create_file(stream->chan->pathname,
+                       struct lttng_index_file *index_file;
+
+                       index_file = lttng_index_file_create(stream->chan->pathname,
                                        stream->name, stream->uid, stream->gid,
                                        stream->chan->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
+                                       stream->tracefile_count_current,
+                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                       if (!index_file) {
                                goto error;
                        }
-                       stream->index_fd = ret;
+                       stream->index_file = index_file;
                }
        }
        ret = 0;
This page took 0.043666 seconds and 5 git commands to generate.