Fix: don't create index on snapshot
authorDavid Goulet <dgoulet@efficios.com>
Fri, 27 Sep 2013 19:46:13 +0000 (15:46 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 27 Sep 2013 22:28:16 +0000 (18:28 -0400)
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/cmd-2-4.c
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/snapshot.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h

index 45410e4e81ad912c8e7f45107ff59e50e6635f31..f199b8396707746bb1144ef7f8f2b7c1d321f89a 100644 (file)
@@ -46,6 +46,7 @@ int cmd_create_session_2_4(struct relay_command *cmd,
        strncpy(session->hostname, session_info.hostname,
                        sizeof(session->hostname));
        session->live_timer = be32toh(session_info.live_timer);
+       session->snapshot = be32toh(session_info.snapshot);
 
        ret = 0;
 
index da23240341a52b5f99dd1eaa0628b9be77b047eb..3c8c26c29ec0e6fe4d54b2ee42303895913fff21 100644 (file)
@@ -62,6 +62,16 @@ struct relay_session {
        struct rcu_head rcu_node;
        uint32_t viewer_attached;
        uint32_t stream_count;
+       /* Tell if this session is for a snapshot or not. */
+       unsigned int snapshot:1;
+
+       /*
+        * Indicate version protocol for this session. This is especially useful
+        * for the data thread that has no idea which version it operates on since
+        * linking control/data sockets is non trivial.
+        */
+       uint64_t minor;
+       uint64_t major;
 };
 
 /*
index 59888c280c341cc1c16a5c89d652a88802787ec2..dffcaff9855a61b265bba6b3eb376a990188a76e 100644 (file)
@@ -904,6 +904,8 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
 
        session->id = ++last_relay_session_id;
        session->sock = cmd->sock;
+       session->minor = cmd->minor;
+       session->major = cmd->major;
        cmd->session = session;
 
        reply.session_id = htobe64(session->id);
@@ -1832,17 +1834,98 @@ end:
        return ret;
 }
 
+/*
+ * Handle index for a data stream.
+ *
+ * RCU read side lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
+               int rotate_index)
+{
+       int ret = 0, index_created = 0;
+       uint64_t stream_id, data_offset;
+       struct relay_index *index, *wr_index = NULL;
+
+       assert(stream);
+
+       stream_id = stream->stream_handle;
+       /* Get data offset because we are about to update the index. */
+       data_offset = htobe64(stream->tracefile_size_current);
+
+       /*
+        * Lookup for an existing index for that stream id/sequence number. If on
+        * exists, the control thread already received the data for it thus we need
+        * to write it on disk.
+        */
+       index = relay_index_find(stream_id, net_seq_num);
+       if (!index) {
+               /* A successful creation will add the object to the HT. */
+               index = relay_index_create(stream_id, net_seq_num);
+               if (!index) {
+                       ret = -1;
+                       goto error;
+               }
+               index_created = 1;
+       }
+
+       if (rotate_index || stream->index_fd < 0) {
+               index->to_close_fd = stream->index_fd;
+               ret = index_create_file(stream->path_name, stream->channel_name,
+                               relayd_uid, relayd_gid, stream->tracefile_size,
+                               stream->tracefile_count_current);
+               if (ret < 0) {
+                       /* This will close the stream's index fd if one. */
+                       relay_index_free_safe(index);
+                       goto error;
+               }
+               stream->index_fd = ret;
+       }
+       index->fd = stream->index_fd;
+       index->index_data.offset = data_offset;
+
+       if (index_created) {
+               /*
+                * Try to add the relay index object to the hash table. If an object
+                * already exist, destroy back the index created and set the data.
+                */
+               relay_index_add(index, &wr_index);
+               if (wr_index) {
+                       /* Copy back data from the created index. */
+                       wr_index->fd = index->fd;
+                       wr_index->to_close_fd = index->to_close_fd;
+                       wr_index->index_data.offset = data_offset;
+                       free(index);
+               }
+       } else {
+               /* The index already exists so write it on disk. */
+               wr_index = index;
+       }
+
+       /* Do we have a writable ready index to write on disk. */
+       if (wr_index) {
+               ret = relay_index_write(wr_index->fd, wr_index);
+               if (ret < 0) {
+                       goto error;
+               }
+               stream->total_index_received++;
+       }
+
+error:
+       return ret;
+}
+
 /*
  * relay_process_data: Process the data received on the data socket
  */
 static
 int relay_process_data(struct relay_command *cmd)
 {
-       int ret = 0, rotate_index = 0, index_created = 0;
+       int ret = 0, rotate_index = 0;
        struct relay_stream *stream;
-       struct relay_index *index, *wr_index = NULL;
        struct lttcomm_relayd_data_hdr data_hdr;
-       uint64_t stream_id, data_offset;
+       uint64_t stream_id;
        uint64_t net_seq_num;
        uint32_t data_size;
 
@@ -1915,77 +1998,18 @@ int relay_process_data(struct relay_command *cmd)
                rotate_index = 1;
        }
 
-       /* Get data offset because we are about to update the index. */
-       data_offset = htobe64(stream->tracefile_size_current);
-
        /*
-        * Lookup for an existing index for that stream id/sequence number. If on
-        * exists, the control thread already received the data for it thus we need
-        * to write it on disk.
+        * Index are handled in protocol version 2.4 and above. Also, snapshot and
+        * index are NOT supported.
         */
-       index = relay_index_find(stream_id, net_seq_num);
-       if (!index) {
-               /* A successful creation will add the object to the HT. */
-               index = relay_index_create(stream->stream_handle, net_seq_num);
-               if (!index) {
-                       goto end_rcu_unlock;
-               }
-               index_created = 1;
-       }
-
-       if (rotate_index || stream->index_fd < 0) {
-               index->to_close_fd = stream->index_fd;
-               ret = index_create_file(stream->path_name, stream->channel_name,
-                               relayd_uid, relayd_gid, stream->tracefile_size,
-                               stream->tracefile_count_current);
+       if (stream->session->minor >= 4 && !stream->session->snapshot) {
+               ret = handle_index_data(stream, net_seq_num, rotate_index);
                if (ret < 0) {
-                       /* This will close the stream's index fd if one. */
-                       relay_index_free_safe(index);
                        goto end_rcu_unlock;
                }
-               stream->index_fd = ret;
-       }
-       index->fd = stream->index_fd;
-       index->index_data.offset = data_offset;
-
-       if (index_created) {
-               /*
-                * Try to add the relay index object to the hash table. If an object
-                * already exist, destroy back the index created and set the data.
-                */
-               relay_index_add(index, &wr_index);
-               if (wr_index) {
-                       /* Copy back data from the created index. */
-                       wr_index->fd = index->fd;
-                       wr_index->to_close_fd = index->to_close_fd;
-                       wr_index->index_data.offset = data_offset;
-                       free(index);
-               }
-       } else {
-               /* The index already exists so write it on disk. */
-               wr_index = index;
-       }
-
-       /* Do we have a writable ready index to write on disk. */
-       if (wr_index) {
-               /* Starting at 2.4, create the index file if none available. */
-               if (cmd->minor >= 4 && stream->index_fd < 0) {
-                       ret = index_create_file(stream->path_name, stream->channel_name,
-                                       relayd_uid, relayd_gid, stream->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
-                               goto end_rcu_unlock;
-                       }
-                       stream->index_fd = ret;
-               }
-
-               ret = relay_index_write(wr_index->fd, wr_index);
-               if (ret < 0) {
-                       goto end_rcu_unlock;
-               }
-               stream->total_index_received++;
        }
 
+       /* Write data to stream output fd. */
        do {
                ret = write(stream->fd, data_buffer, data_size);
        } while (ret < 0 && errno == EINTR);
@@ -2350,15 +2374,9 @@ error:
                                        &iter, relay_connection, sessions_ht);
                }
        }
-error_poll_create:
-       {
-               struct relay_index *index;
-               cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
-                       relay_index_delete(index);
-               }
-               lttng_ht_destroy(indexes_ht);
-       }
        rcu_read_unlock();
+error_poll_create:
+       lttng_ht_destroy(indexes_ht);
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
 relay_connections_ht_error:
index f0870e6b0c0abcf2ae50e83c32df5ef2b162a33e..3a500092fea6aa6391a0708863820fc601e48613 100644 (file)
@@ -960,7 +960,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
        if (type == LTTNG_STREAM_CONTROL) {
                ret = relayd_create_session(rsock,
                                &msg.u.relayd_sock.relayd_session_id,
-                               session_name, hostname, session_live_timer);
+                               session_name, hostname, session_live_timer,
+                               consumer->snapshot);
                if (ret < 0) {
                        /* Close the control socket. */
                        (void) relayd_close(rsock);
index 4d6838f6aff14636b5cd2db35a4f995f5fc839df..484d8f7bad90b149c5d72e6571bcc23ff60f6979 100644 (file)
@@ -163,6 +163,9 @@ struct consumer_output {
         */
        struct lttng_ht *socks;
 
+       /* Tell if this output is used for snapshot. */
+       unsigned int snapshot:1;
+
        union {
                char trace_path[PATH_MAX];
                struct consumer_net net;
index d22b5d52c98b730d122d91605c0f6851ef578c61..6b1aa8db1085c5edf2c01e404e3ce9387561c17d 100644 (file)
@@ -80,6 +80,7 @@ static int output_init(uint64_t max_size, const char *name,
                ret = -ENOMEM;
                goto error;
        }
+       output->consumer->snapshot = 1;
 
        /* No URL given. */
        if (nb_uri == 0) {
index a4c8a9261c29a9e58fc96e3023a68d8da8c44d68..1f427e0e25cb8e1d2e502760294918be10a5172d 100644 (file)
@@ -122,7 +122,7 @@ error:
  */
 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
                uint64_t *session_id, char *session_name, char *hostname,
-               int session_live_timer)
+               int session_live_timer, unsigned int snapshot)
 {
        int ret;
        struct lttcomm_relayd_create_session_2_4 msg;
@@ -130,6 +130,7 @@ static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
        strncpy(msg.session_name, session_name, sizeof(msg.session_name));
        strncpy(msg.hostname, hostname, sizeof(msg.hostname));
        msg.live_timer = htobe32(session_live_timer);
+       msg.snapshot = htobe32(snapshot);
 
        /* Send command */
        ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
@@ -167,7 +168,8 @@ error:
  * a lttng error code from the relayd.
  */
 int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id,
-               char *session_name, char *hostname, int session_live_timer)
+               char *session_name, char *hostname, int session_live_timer,
+               unsigned int snapshot)
 {
        int ret;
        struct lttcomm_relayd_status_session reply;
@@ -184,9 +186,8 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_i
                        ret = relayd_create_session_2_1(rsock, session_id);
                case 4:
                default:
-                       ret = relayd_create_session_2_4(rsock, session_id,
-                                       session_name, hostname,
-                                       session_live_timer);
+                       ret = relayd_create_session_2_4(rsock, session_id, session_name,
+                                       hostname, session_live_timer, snapshot);
        }
 
        if (ret < 0) {
index d59f4ae2f2dc45b80134385e0815187dcbbb6f1c..d12d7a495d80fdb3e19e191d49b7f22e010125c1 100644 (file)
@@ -26,7 +26,8 @@
 int relayd_connect(struct lttcomm_relayd_sock *sock);
 int relayd_close(struct lttcomm_relayd_sock *sock);
 int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id,
-               char *session_name, char *hostname, int session_live_timer);
+               char *session_name, char *hostname, int session_live_timer,
+               unsigned int snapshot);
 int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name,
                const char *pathname, uint64_t *stream_id,
                uint64_t tracefile_size, uint64_t tracefile_count);
index bc27b0ed4e0ef764d0bf0e1af92bb30ec93ccdc2..24c4c6e84336584f986921bddbdbf212b9ad686b 100644 (file)
@@ -171,6 +171,7 @@ struct lttcomm_relayd_create_session_2_4 {
        char session_name[NAME_MAX];
        char hostname[HOST_NAME_MAX];
        uint32_t live_timer;
+       uint32_t snapshot;
 } LTTNG_PACKED;
 
 #endif /* _RELAYD_COMM */
This page took 0.032832 seconds and 5 git commands to generate.