relayd: Implement custom EfficiOS session clear
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 3 Apr 2019 20:31:01 +0000 (16:31 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Wed, 10 Apr 2019 23:36:37 +0000 (19:36 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
12 files changed:
src/bin/lttng-relayd/ctf-trace.c
src/bin/lttng-relayd/ctf-trace.h
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/session.c
src/bin/lttng-relayd/session.h
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/bin/lttng-relayd/tracefile-array.c
src/bin/lttng-relayd/tracefile-array.h
src/bin/lttng-relayd/viewer-stream.c
src/common/sessiond-comm/sessiond-comm.h

index 27009efc38270bd0e46adc6b046f59134c8b5b12..600d0bcae855938042f06252ad5cc495b521075b 100644 (file)
@@ -211,3 +211,25 @@ end:
        rcu_read_unlock();
        return vstream;
 }
+
+int ctf_trace_clear(struct ctf_trace *trace)
+{
+       struct relay_stream *stream;
+       int ret = 0;
+
+       rcu_read_lock();
+       cds_list_for_each_entry_rcu(stream, &trace->stream_list,
+                       stream_node) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
+               ret = stream_clear(stream);
+               stream_put(stream);
+               if (ret) {
+                       goto unlock;
+               }
+       }
+unlock:
+       rcu_read_unlock();
+       return ret;
+}
index d051f80837e4bcbbae88a7a12c12b6504968789a..33366cddbabf0104ecf40c093f34842076edc99d 100644 (file)
@@ -69,4 +69,6 @@ int ctf_trace_close(struct ctf_trace *trace);
 
 struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace);
 
+int ctf_trace_clear(struct ctf_trace *trace);
+
 #endif /* _CTF_TRACE_H */
index f81872b9d1b4cfffa138dcb99eb40cc8b863db77..d682d3b09bd22e0ee3a4fcc534ecc80b47a56d70 100644 (file)
@@ -1149,9 +1149,9 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        }
 
        /*
-        * First time, we open the index file and at least one index is ready.
+        * Deal with streams that have not received any index so far (or after clear).
         */
-       if (rstream->index_received_seqcount == 0) {
+       if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount) {
                ret = -ENOENT;
                goto end;
        }
@@ -1184,6 +1184,11 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 {
        int ret;
 
+       DBG("check status: recv %" PRIu64 " sent %" PRIu64 " clear index %" PRIu64 " clear data %" PRIu64 " for stream %" PRIu64,
+                               rstream->index_received_seqcount,
+                               vstream->index_sent_seqcount,
+                               rstream->clear_position_index_seqcount,
+                               rstream->clear_position_data_seqcount, vstream->stream->stream_handle);
        if ((trace->session->connection_closed || rstream->closed)
                        && rstream->index_received_seqcount
                                == vstream->index_sent_seqcount) {
@@ -1195,7 +1200,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
                        rstream->index_received_seqcount
-                               == vstream->index_sent_seqcount) {
+                               <= vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1204,19 +1209,33 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * inform the client of a time interval during which we can
                 * guarantee that there are no events to read (and never will
                 * be).
+                *
+                * The sent seqcount can grow higher than receive seqcount on clear.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
+               DBG("INACTIVE: with beacon, r v recv eq for stream %" PRIu64, vstream->stream->stream_handle);
                goto index_ready;
        } else if (rstream->index_received_seqcount
-                       == vstream->index_sent_seqcount) {
+                       <= vstream->index_sent_seqcount) {
                /*
-                * This checks whether received == sent seqcount. In
+                * This checks whether received <= sent seqcount. In
                 * this case, we have not received a beacon. Therefore,
                 * we can only ask the client to retry later.
+                *
+                * The sent seqcount can grow higher than receive seqcount on clear.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("RETRY: r v recv leq for stream %" PRIu64, vstream->stream->stream_handle);
+               goto index_ready;
+       } else if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount ||
+                       rstream->index_received_seqcount <= rstream->clear_position_data_seqcount) {
+               /*
+                * Send "retry" reply if a clear operation is in progress.
+                */
+               index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("RETRY: r <= clear pos for stream %" PRIu64, vstream->stream->stream_handle);
                goto index_ready;
        } else if (!tracefile_array_seq_in_file(rstream->tfa,
                        vstream->current_tracefile_id,
@@ -1256,6 +1275,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                        vstream->current_tracefile_id,
                                        vstream->index_sent_seqcount)) {
                        index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       DBG("RETRY: !tfs seq in file for stream %" PRIu64, vstream->stream->stream_handle);
                        goto index_ready;
                }
                assert(tracefile_array_seq_in_file(rstream->tfa,
@@ -1329,6 +1349,31 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * In case the stream has been cleared, we need to push the viewer
+        * stream index sent seqcount forward. Note that this can temporarily
+        * bring the index_sent position beyond the index received position.
+        */
+       if (rstream->clear_position_index_seqcount >= vstream->index_sent_seqcount) {
+               vstream->index_sent_seqcount = rstream->clear_position_index_seqcount;
+               /*
+                * Close the currently open index and data files to ensure we
+                * sync up with the receive side.
+                */
+               if (vstream->index_file) {
+                       lttng_index_file_put(vstream->index_file);
+                       vstream->index_file = NULL;
+               }
+               if (vstream->stream_fd) {
+                       stream_fd_put(vstream->stream_fd);
+                       vstream->stream_fd = NULL;
+               }
+               /*
+                * In tracefile rotation, we reset the current tracefile to 0.
+                */
+               vstream->current_tracefile_id = 0;
+       }
+
        /* Try to open an index if one is needed for that stream. */
        ret = try_open_index(vstream, rstream);
        if (ret < 0) {
@@ -1336,9 +1381,10 @@ int viewer_get_next_index(struct relay_connection *conn)
                        /*
                         * The index is created only when the first data
                         * packet arrives, it might not be ready at the
-                        * beginning of the session
+                        * beginning of the session. Let check_index_status
+                        * deal with inactivity beacons.
                         */
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto check_status;
                } else {
                        /* Unhandled error. */
                        viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
@@ -1346,6 +1392,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+check_status:
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
        if (ret < 0) {
                goto error_put;
index 705b5d6c52247be9e1d318b7a8dd94ec39123129..0e95c948173659496c476823a330af4ab9f37ad2 100644 (file)
@@ -1451,6 +1451,51 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_clear_session: clear all data files belonging to a session.
+ */
+static
+int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_generic_reply reply;
+
+       DBG("Clear session received");
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to clear session before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (!opt_allow_clear) {
+               ERR("Trying to clear session, but clear is disallowed.");
+               ret = -1;
+               goto end_no_session;
+       }
+       ret = session_clear(session);
+
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"clear session\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_unknown_command: send -1 if received unknown command
  */
@@ -2049,6 +2094,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                ret = 0;
                goto end_stream_put;
        } else {
+               DBG("Received index for stream %" PRIu64,
+                               stream->stream_handle);
                stream->beacon_ts_end = -1ULL;
        }
 
@@ -2069,8 +2116,13 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               /* Clear index and data file(s) if reaching the clear position. */
+               ret = try_stream_clear_index_data(stream);
+               if (ret) {
+                       goto end_stream_put;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2084,6 +2136,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
+       stream->prev_index_seq = net_seq_num;
 
 end_stream_put:
        pthread_mutex_unlock(&stream->lock);
@@ -2197,6 +2250,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS:
+               ret = relay_clear_session(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
@@ -2275,8 +2331,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               /* Clear index and data file(s) if reaching the clear position. */
+               ret = try_stream_clear_index_data(stream);
+               if (ret) {
+                       goto end;
+               }
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
@@ -2337,7 +2398,7 @@ static int relay_process_data(struct relay_connection *conn)
 
        net_seq_num = be64toh(data_hdr.net_seq_num);
 
-       DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+       DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
                data_size, stream_id, net_seq_num);
 
        pthread_mutex_lock(&stream->lock);
@@ -2368,6 +2429,7 @@ static int relay_process_data(struct relay_connection *conn)
                 * rotation.
                 */
                stream->tracefile_size_current = 0;
+               stream->tracefile_count_current = new_id;
                rotate_index = 1;
        }
 
index f76fb4a42e7606b8ebfea427033b00e3de192020..8edf06da990f6ffe72b758467147e7087451854a 100644 (file)
@@ -272,3 +272,22 @@ void print_sessions(void)
        }
        rcu_read_unlock();
 }
+
+int session_clear(struct relay_session *session)
+{
+       struct ctf_trace *trace;
+       struct lttng_ht_iter iter;
+       int ret = 0;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
+                       &iter.iter, trace, node.node) {
+               ret = ctf_trace_clear(trace);
+               if (ret) {
+                       goto rcu_unlock;
+               }
+       }
+rcu_unlock:
+       rcu_read_unlock();
+       return ret;
+}
index 4c8f8a61f402dcd98f36fbba780a86137fb02ad0..34424654eb3acf0d64e28b52494a72e34334f78d 100644 (file)
@@ -122,4 +122,6 @@ int session_abort(struct relay_session *session);
 
 void print_sessions(void);
 
+int session_clear(struct relay_session *session);
+
 #endif /* _SESSION_H */
index f989bfd1361be36bd20d4edfa018f36b11de062b..1218c2f4534f6b9537fbdc4f55b3bac4e046437c 100644 (file)
@@ -91,6 +91,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 
        stream->stream_handle = stream_handle;
        stream->prev_seq = -1ULL;
+       stream->prev_index_seq = -1ULL;
        stream->last_net_seq_num = -1ULL;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
@@ -477,3 +478,212 @@ void print_relay_streams(void)
        }
        rcu_read_unlock();
 }
+
+static int relay_unlink_stream_files_rotation(struct relay_stream *stream)
+{
+       uint64_t tracefile_size = stream->tracefile_size;
+       uint64_t tracefile_count = stream->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
+
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+                               tracefile_size, count, -1, -1, NULL);
+               if (ret < 0 && errno != ENOENT) {
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+static int relay_unlink_index_files_rotation(struct relay_stream *stream)
+{
+       uint64_t tracefile_size = stream->tracefile_size;
+       uint64_t tracefile_count = stream->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
+
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               if (stream->index_file) {
+                       ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+                                       -1, -1, tracefile_size, count);
+                       if (ret < 0 && errno != ENOENT) {
+                               return -1;
+                       }
+               }
+       }
+       return 0;
+}
+
+static int relay_unlink_stream_files(struct relay_stream *stream)
+{
+       int ret;
+
+       ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+                       stream->tracefile_size, 0, -1, -1, NULL);
+       if (ret < 0 && errno != ENOENT) {
+               return -1;
+       }
+       return 0;
+}
+
+static int relay_unlink_index_files(struct relay_stream *stream)
+{
+       int ret;
+
+       ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+                       -1, -1, stream->tracefile_size, 0);
+       if (ret < 0 && errno != ENOENT) {
+               return -1;
+       }
+       return 0;
+}
+
+int try_stream_clear_index_data(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       DBG("try stream clear for handle %" PRIu64 " recv %" PRIu64 " clear_pos_idx %" PRIu64 " clear_pos_data %" PRIu64,
+                       stream->stream_handle, stream->index_received_seqcount, stream->clear_position_index_seqcount,
+                       stream->clear_position_data_seqcount);
+       if (!stream->index_received_seqcount) {
+               return 0;
+       }
+       if (stream->index_received_seqcount <= stream->clear_position_index_seqcount) {
+               /*
+                * Put ref on current index file. The new index file will be created upon
+                * reception of next index data beyond the clear position.
+                */
+               if (stream->index_file) {
+                       lttng_index_file_put(stream->index_file);
+                       if (stream->tracefile_size > 0) {
+                               ret = relay_unlink_index_files_rotation(stream);
+                       } else {
+                               ret = relay_unlink_index_files(stream);
+                       }
+                       if (ret) {
+                               return ret;
+                       }
+                       stream->index_file = NULL;
+               }
+               tracefile_array_reset(stream->tfa);
+       }
+       if (stream->index_received_seqcount == stream->clear_position_data_seqcount) {
+               ret = close(stream->stream_fd->fd);
+               if (ret < 0) {
+                       PERROR("Closing tracefile");
+                       return -1;
+               }
+               stream->stream_fd->fd = -1;
+               stream->tracefile_size_current = 0;
+
+               if (stream->tracefile_size > 0) {
+                       ret = relay_unlink_stream_files_rotation(stream);
+               } else {
+                       ret = relay_unlink_stream_files(stream);
+               }
+
+               /* Create new files. */
+               ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+                               stream->tracefile_size, 0, -1, -1, NULL);
+               if (ret < 0) {
+                       ERR("Create output file");
+                       return -1;
+               }
+               stream->stream_fd->fd = ret;
+       }
+       return 0;
+}
+
+int stream_clear(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       pthread_mutex_lock(&stream->lock);
+
+       if (stream->is_metadata) {
+               /* Do not clear metadata streams. */
+               goto end;
+       }
+
+       /*
+        * Clear index and data for all packets up to and including the
+        * clear position index seqcount.
+        *
+        * Clearing the index is straightforward: we can remove the entire
+        * on-disk index for this stream because the control port is an ordered
+        * protocol. We may also have in-flight indexes within the indexes_ht
+        * (pending data reception). We need to mark those so they get
+        * discarded (as well as their associated data content) upon reception
+        * of matching data.
+        *
+        * Clearing the data: because data is written directly into the output files,
+        * we need to carefully handle cases where index or data positions are ahead
+        * of the other.
+        *
+        * In tracefile rotation mode, we need to move the seq_tail to the head
+        * position.
+        */
+
+       /*
+        * If the data received is beyond indexes received, unlink data immediately and
+        * discard indexes when they arrive (up to the clear position).
+        *
+        * If indexes received is beyond data, we will reach the sync point when the
+        * indexes are received, so it will be safe to unlink the data and index files
+        * at that point.
+        *
+        * Clear index and data file(s) immediately if reaching the clear
+        * position (no in-flight indexes).
+        */
+       DBG("stream clear for handle %" PRIu64 " prev_seq %" PRIu64 " prev_index_seq %" PRIu64 " indexes in flight %d",
+                       stream->stream_handle, stream->prev_seq, stream->prev_index_seq,
+                       stream->indexes_in_flight);
+       if (stream->prev_seq > stream->prev_index_seq) {
+               stream->clear_position_data_seqcount = stream->index_received_seqcount;
+               stream->clear_position_index_seqcount = stream->index_received_seqcount +
+                                                       stream->indexes_in_flight;
+       } else if (stream->prev_seq < stream->prev_index_seq) {
+               stream->clear_position_data_seqcount = stream->index_received_seqcount +
+                                                       stream->indexes_in_flight;
+               stream->clear_position_index_seqcount = stream->index_received_seqcount +
+                                                       stream->indexes_in_flight;
+       } else {
+               assert(stream->indexes_in_flight == 0);
+               stream->clear_position_data_seqcount = stream->index_received_seqcount;
+               stream->clear_position_index_seqcount = stream->index_received_seqcount;
+
+       }
+       ret = try_stream_clear_index_data(stream);
+       if (ret) {
+               goto end;
+       }
+
+end:
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
+}
index d471c7b7f49561c7c1882b5c584822daf037fcb5..e401c51af766c6da058cab78e0c344e57f0dc1c4 100644 (file)
@@ -54,6 +54,7 @@ struct relay_stream {
         */
        pthread_mutex_t lock;
        uint64_t prev_seq;              /* previous data sequence number encountered. */
+       uint64_t prev_index_seq;        /* previous index sequence number encountered. */
        uint64_t last_net_seq_num;      /* seq num to encounter before closing. */
 
        /* FD on which to write the stream data. */
@@ -68,6 +69,7 @@ struct relay_stream {
        uint64_t tracefile_size;
        uint64_t tracefile_size_current;
        uint64_t tracefile_count;
+       uint64_t tracefile_count_current;
 
        /*
         * Counts the number of received indexes. The "tag" associated
@@ -77,6 +79,14 @@ struct relay_stream {
         */
        uint64_t index_received_seqcount;
 
+
+       /*
+        * Index sequence number for the position at which clear needs to be
+        * performed.
+        */
+       uint64_t clear_position_index_seqcount;
+       uint64_t clear_position_data_seqcount;
+
        /*
         * Tracefile array is an index of the stream trace files,
         * indexed by position. It allows keeping track of the oldest
@@ -148,5 +158,7 @@ void stream_put(struct relay_stream *stream);
 void try_stream_close(struct relay_stream *stream);
 void stream_publish(struct relay_stream *stream);
 void print_relay_streams(void);
+int stream_clear(struct relay_stream *stream);
+int try_stream_clear_index_data(struct relay_stream *stream);
 
 #endif /* _STREAM_H */
index 20b760c06893f2387968f3f773dd2cd846675fa1..3c62af142cb358bf717c32301d727d3729c4e5c5 100644 (file)
@@ -62,6 +62,21 @@ void tracefile_array_destroy(struct tracefile_array *tfa)
        free(tfa);
 }
 
+void tracefile_array_reset(struct tracefile_array *tfa)
+{
+       size_t count, i;
+
+       count = tfa->count;
+       for (i = 0; i < count; i++) {
+               tfa->tf[i].seq_head = -1ULL;
+               tfa->tf[i].seq_tail = -1ULL;
+       }
+       tfa->seq_head = -1ULL;
+       tfa->seq_tail = -1ULL;
+       tfa->file_head = 0;
+       tfa->file_tail = 0;
+}
+
 void tracefile_array_file_rotate(struct tracefile_array *tfa)
 {
        uint64_t *headp, *tailp;
@@ -90,15 +105,16 @@ void tracefile_array_file_rotate(struct tracefile_array *tfa)
        *tailp = -1ULL;
 }
 
-void tracefile_array_commit_seq(struct tracefile_array *tfa)
+void tracefile_array_commit_seq(struct tracefile_array *tfa,
+               uint64_t new_seq_head)
 {
        uint64_t *headp, *tailp;
 
        /* Increment overall head. */
-       tfa->seq_head++;
-       /* If we are committing our first index overall, set tail to 0. */
+       tfa->seq_head = new_seq_head;
+       /* If we are committing our first index overall, set tail to head. */
        if (tfa->seq_tail == -1ULL) {
-               tfa->seq_tail = 0;
+               tfa->seq_tail = new_seq_head;
        }
        if (!tfa->count) {
                /* Not in tracefile rotation mode. */
@@ -147,7 +163,6 @@ bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
                 */
                return true;
        }
-       assert(file_index < tfa->count);
        if (seq == -1ULL) {
                return false;
        }
index 9158f4fe40c3c16bd5446ea811cc44cef379b966..9e73f0e06de8e08c50fbded1feb586975d7a4775 100644 (file)
@@ -49,7 +49,9 @@ struct tracefile_array *tracefile_array_create(size_t count);
 void tracefile_array_destroy(struct tracefile_array *tfa);
 
 void tracefile_array_file_rotate(struct tracefile_array *tfa);
-void tracefile_array_commit_seq(struct tracefile_array *tfa);
+void tracefile_array_commit_seq(struct tracefile_array *tfa,
+               uint64_t new_seq_head);
+void tracefile_array_reset(struct tracefile_array *tfa);
 
 uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
 /* May return -1ULL in the case where we have not received any indexes yet. */
index 8a3b09a92060ef25e9f9f8f50b2d36d15c2de159..e9d9da2547571ce78518202c068bcc85751117c2 100644 (file)
@@ -115,7 +115,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
         * If we never received an index for the current stream, delay
         * the opening of the index, otherwise open it right now.
         */
-       if (stream->index_received_seqcount == 0) {
+       if (stream->index_file == NULL) {
                vstream->index_file = NULL;
        } else {
                vstream->index_file = lttng_index_file_open(vstream->path_name,
index 51f680005f5e67113b912d988cf9b7c067b8def9..59c4aad71da97d9ff2e1d9250740460be3d94205 100644 (file)
@@ -124,6 +124,11 @@ enum lttcomm_relayd_command {
        RELAYD_STREAMS_SENT                 = 16,
        /* Ask the relay to reset the metadata trace file (2.8+) */
        RELAYD_RESET_METADATA               = 17,
+
+       /* Feature branch specific commands start at 10000. */
+
+       /* Ask the relay to clear files belonging to a session (feature branch). */
+       RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS= 10000,
 };
 
 /*
This page took 0.040785 seconds and 5 git commands to generate.