LTTNG_VIEWER_NAME_MAX);
viewer_stream->tracefile_count = stream->tracefile_count;
viewer_stream->metadata_flag = stream->metadata_flag;
+ viewer_stream->tracefile_count_last = -1ULL;
if (seek_last) {
viewer_stream->tracefile_count_current =
stream->tracefile_count_current;
/*
* Rotate a stream to the next tracefile.
*
- * Returns 0 on success, a negative value on error.
+ * Returns 0 on success, 1 on EOF, a negative value on error.
*/
static
int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream,
tracefile_id = (viewer_stream->tracefile_count_current + 1) %
viewer_stream->tracefile_count;
+ /*
+ * Detect the last tracefile to open.
+ */
+ if (viewer_stream->tracefile_count_last != -1ULL &&
+ viewer_stream->tracefile_count_last ==
+ viewer_stream->tracefile_count_current) {
+ ret = 1;
+ goto end;
+ }
if (stream) {
pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
}
}
viewer_stream->tracefile_count_current = tracefile_id;
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- if (viewer_stream->abort_flag == 0) {
- ret = close(viewer_stream->index_read_fd);
- if (ret < 0) {
- PERROR("close index file");
- }
- ret = close(viewer_stream->read_fd);
- if (ret < 0) {
- PERROR("close tracefile");
- }
- } else {
- viewer_stream->abort_flag = 0;
+ ret = close(viewer_stream->index_read_fd);
+ if (ret < 0) {
+ PERROR("close index file %d",
+ viewer_stream->index_read_fd);
+ }
+ viewer_stream->index_read_fd = -1;
+ ret = close(viewer_stream->read_fd);
+ if (ret < 0) {
+ PERROR("close tracefile %d",
+ viewer_stream->read_fd);
}
+ viewer_stream->read_fd = -1;
+
+ pthread_mutex_lock(&viewer_stream->overwrite_lock);
+ viewer_stream->abort_flag = 0;
+ pthread_mutex_unlock(&viewer_stream->overwrite_lock);
+ viewer_stream->index_read_fd = -1;
viewer_stream->read_fd = -1;
+ if (stream) {
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+ }
ret = open_index(viewer_stream);
if (ret < 0) {
goto error;
ret = 0;
+end:
error:
return ret;
}
ret = rotate_viewer_stream(vstream, rstream);
if (ret < 0) {
goto end_unlock;
+ } else if (ret == 1) {
+ viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ goto send_reply;
}
}
- if (rstream->beacon_ts_end != -1ULL &&
- vstream->last_sent_index == rstream->total_index_received) {
- viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
- viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
- goto send_reply;
- }
- /*
- * Reader and writer are working in the same tracefile, so we care
- * about the number of index received and sent. Otherwise, we read
- * up to EOF.
- */
pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
- if (rstream->tracefile_count_current == vstream->tracefile_count_current
- && rstream->total_index_received <= vstream->last_sent_index
- && !vstream->close_write_flag) {
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- /* No new index to send, retry later. */
- viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
- goto send_reply;
+ if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
+ if (rstream->beacon_ts_end != -1ULL &&
+ vstream->last_sent_index == rstream->total_index_received) {
+ viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+ viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ goto send_reply;
+ /*
+ * Reader and writer are working in the same tracefile, so we care
+ * about the number of index received and sent. Otherwise, we read
+ * up to EOF.
+ */
+ } else if (rstream->total_index_received <= vstream->last_sent_index
+ && !vstream->close_write_flag) {
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ /* No new index to send, retry later. */
+ viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ goto send_reply;
+ }
}
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
} else if (!rstream && vstream->close_write_flag &&
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
}
+ pthread_mutex_lock(&vstream->overwrite_lock);
+ if (vstream->abort_flag) {
+ /*
+ * The file is being overwritten by the writer, we cannot
+ * use it.
+ */
+ viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ pthread_mutex_unlock(&vstream->overwrite_lock);
+ ret = rotate_viewer_stream(vstream, rstream);
+ if (ret < 0) {
+ goto end_unlock;
+ } else if (ret == 1) {
+ viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ goto send_reply;
+ }
+ goto send_reply;
+ }
ret = lttng_read(vstream->index_read_fd, &packet_index,
sizeof(packet_index));
+ pthread_mutex_unlock(&vstream->overwrite_lock);
if (ret < sizeof(packet_index)) {
/*
* The tracefile is closed in write, so we read up to EOF.
ret = rotate_viewer_stream(vstream, rstream);
if (ret < 0) {
goto end_unlock;
- }
- } else {
- /*
- * If the read fd was closed by the streaming side, the
- * abort_flag will be set to 1, otherwise it is an error.
- */
- if (vstream->abort_flag != 1) {
- PERROR("Relay reading index file");
- viewer_index.status = htobe32(VIEWER_INDEX_ERR);
- goto send_reply;
- } else {
+ } else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ goto send_reply;
}
+ } else {
+ PERROR("Relay reading index file %d",
+ vstream->index_read_fd);
+ viewer_index.status = htobe32(VIEWER_INDEX_ERR);
}
goto send_reply;
} else {