X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=480c459ce6a49724f54d1b6ef71c0b5a6d458a1b;hp=e51ff5cdf53495d4848a2867cfc952e9f2534bc5;hb=cef0f7d51b8025d3ba04e6496242c1cca1641aa6;hpb=cc7f9e363bf9230260b2ef27c84e24f75a7535c0 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index e51ff5cdf..480c459ce 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -801,23 +801,34 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream, } } viewer_stream->tracefile_count_current = tracefile_id; - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (viewer_stream->abort_flag == 0) { - ret = close(viewer_stream->index_read_fd); - if (ret < 0) { - PERROR("close index file"); + if (viewer_stream->index_read_fd > 0) { + ret = close(viewer_stream->index_read_fd); + if (ret < 0) { + PERROR("close index file %d", + viewer_stream->index_read_fd); + } + viewer_stream->index_read_fd = -1; } - ret = close(viewer_stream->read_fd); - if (ret < 0) { - PERROR("close tracefile"); + if (viewer_stream->read_fd > 0) { + ret = close(viewer_stream->read_fd); + if (ret < 0) { + PERROR("close tracefile %d", + viewer_stream->read_fd); + } + viewer_stream->read_fd = -1; } } else { viewer_stream->abort_flag = 0; } + viewer_stream->index_read_fd = -1; viewer_stream->read_fd = -1; + if (stream) { + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); + } ret = open_index(viewer_stream); if (ret < 0) { goto error; @@ -1125,25 +1136,26 @@ int viewer_get_next_index(struct relay_command *cmd, goto end_unlock; } } - if (rstream->beacon_ts_end != -1ULL && - vstream->last_sent_index == rstream->total_index_received) { - viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE); - viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); - goto send_reply; - } - /* - * Reader and writer are working in the same tracefile, so we care - * about the number of index received and sent. Otherwise, we read - * up to EOF. - */ pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - if (rstream->tracefile_count_current == vstream->tracefile_count_current - && rstream->total_index_received <= vstream->last_sent_index - && !vstream->close_write_flag) { - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - /* No new index to send, retry later. */ - viewer_index.status = htobe32(VIEWER_INDEX_RETRY); - goto send_reply; + if (rstream->tracefile_count_current == vstream->tracefile_count_current) { + if (rstream->beacon_ts_end != -1ULL && + vstream->last_sent_index == rstream->total_index_received) { + viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE); + viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + goto send_reply; + /* + * Reader and writer are working in the same tracefile, so we care + * about the number of index received and sent. Otherwise, we read + * up to EOF. + */ + } else if (rstream->total_index_received <= vstream->last_sent_index + && !vstream->close_write_flag) { + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + /* No new index to send, retry later. */ + viewer_index.status = htobe32(VIEWER_INDEX_RETRY); + goto send_reply; + } } pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); } else if (!rstream && vstream->close_write_flag && @@ -1161,8 +1173,23 @@ int viewer_get_next_index(struct relay_command *cmd, viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; } + pthread_mutex_lock(&vstream->overwrite_lock); + if (vstream->abort_flag) { + /* + * The file is being overwritten by the writer, we cannot + * use it. + */ + viewer_index.status = htobe32(VIEWER_INDEX_RETRY); + pthread_mutex_unlock(&vstream->overwrite_lock); + ret = rotate_viewer_stream(vstream, rstream); + if (ret < 0) { + goto end_unlock; + } + goto send_reply; + } ret = lttng_read(vstream->index_read_fd, &packet_index, sizeof(packet_index)); + pthread_mutex_unlock(&vstream->overwrite_lock); if (ret < sizeof(packet_index)) { /* * The tracefile is closed in write, so we read up to EOF. @@ -1175,17 +1202,9 @@ int viewer_get_next_index(struct relay_command *cmd, goto end_unlock; } } else { - /* - * If the read fd was closed by the streaming side, the - * abort_flag will be set to 1, otherwise it is an error. - */ - if (vstream->abort_flag != 1) { - PERROR("Relay reading index file"); - viewer_index.status = htobe32(VIEWER_INDEX_ERR); - goto send_reply; - } else { - viewer_index.status = htobe32(VIEWER_INDEX_HUP); - } + PERROR("Relay reading index file %d", + vstream->index_read_fd); + viewer_index.status = htobe32(VIEWER_INDEX_ERR); } goto send_reply; } else {