From cef0f7d51b8025d3ba04e6496242c1cca1641aa6 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 14 Nov 2013 12:51:30 -0500 Subject: [PATCH] Fix concurrency issues while overwriting tracefiles in live Introduce a new lock to set the abort_flag in the viewer stream. This lock is nested inside the tracefile rotation of the writer and is taken before reading the tracefile in the viewer-side. That way, the viewer-side has just to check this flag to know if it is safe to read. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- src/bin/lttng-relayd/live.c | 91 +++++++------ src/bin/lttng-relayd/lttng-relayd.h | 5 + src/bin/lttng-relayd/main.c | 17 +-- tests/fast_regression | 1 + tests/regression/tools/live/Makefile.am | 2 +- .../tools/live/test_ust_tracefile_count | 122 ++++++++++++++++++ 6 files changed, 190 insertions(+), 48 deletions(-) create mode 100755 tests/regression/tools/live/test_ust_tracefile_count diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index e51ff5cdf..480c459ce 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -801,23 +801,34 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream, } } viewer_stream->tracefile_count_current = tracefile_id; - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (viewer_stream->abort_flag == 0) { - ret = close(viewer_stream->index_read_fd); - if (ret < 0) { - PERROR("close index file"); + if (viewer_stream->index_read_fd > 0) { + ret = close(viewer_stream->index_read_fd); + if (ret < 0) { + PERROR("close index file %d", + viewer_stream->index_read_fd); + } + viewer_stream->index_read_fd = -1; } - ret = close(viewer_stream->read_fd); - if (ret < 0) { - PERROR("close tracefile"); + if (viewer_stream->read_fd > 0) { + ret = close(viewer_stream->read_fd); + if (ret < 0) { + PERROR("close tracefile %d", + viewer_stream->read_fd); + } + viewer_stream->read_fd = -1; } } else { viewer_stream->abort_flag = 0; } + viewer_stream->index_read_fd = -1; viewer_stream->read_fd = -1; + if (stream) { + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); + } ret = open_index(viewer_stream); if (ret < 0) { goto error; @@ -1125,25 +1136,26 @@ int viewer_get_next_index(struct relay_command *cmd, goto end_unlock; } } - if (rstream->beacon_ts_end != -1ULL && - vstream->last_sent_index == rstream->total_index_received) { - viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE); - viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); - goto send_reply; - } - /* - * Reader and writer are working in the same tracefile, so we care - * about the number of index received and sent. Otherwise, we read - * up to EOF. - */ pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - if (rstream->tracefile_count_current == vstream->tracefile_count_current - && rstream->total_index_received <= vstream->last_sent_index - && !vstream->close_write_flag) { - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - /* No new index to send, retry later. */ - viewer_index.status = htobe32(VIEWER_INDEX_RETRY); - goto send_reply; + if (rstream->tracefile_count_current == vstream->tracefile_count_current) { + if (rstream->beacon_ts_end != -1ULL && + vstream->last_sent_index == rstream->total_index_received) { + viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE); + viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + goto send_reply; + /* + * Reader and writer are working in the same tracefile, so we care + * about the number of index received and sent. Otherwise, we read + * up to EOF. + */ + } else if (rstream->total_index_received <= vstream->last_sent_index + && !vstream->close_write_flag) { + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + /* No new index to send, retry later. */ + viewer_index.status = htobe32(VIEWER_INDEX_RETRY); + goto send_reply; + } } pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); } else if (!rstream && vstream->close_write_flag && @@ -1161,8 +1173,23 @@ int viewer_get_next_index(struct relay_command *cmd, viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; } + pthread_mutex_lock(&vstream->overwrite_lock); + if (vstream->abort_flag) { + /* + * The file is being overwritten by the writer, we cannot + * use it. + */ + viewer_index.status = htobe32(VIEWER_INDEX_RETRY); + pthread_mutex_unlock(&vstream->overwrite_lock); + ret = rotate_viewer_stream(vstream, rstream); + if (ret < 0) { + goto end_unlock; + } + goto send_reply; + } ret = lttng_read(vstream->index_read_fd, &packet_index, sizeof(packet_index)); + pthread_mutex_unlock(&vstream->overwrite_lock); if (ret < sizeof(packet_index)) { /* * The tracefile is closed in write, so we read up to EOF. @@ -1175,17 +1202,9 @@ int viewer_get_next_index(struct relay_command *cmd, goto end_unlock; } } else { - /* - * If the read fd was closed by the streaming side, the - * abort_flag will be set to 1, otherwise it is an error. - */ - if (vstream->abort_flag != 1) { - PERROR("Relay reading index file"); - viewer_index.status = htobe32(VIEWER_INDEX_ERR); - goto send_reply; - } else { - viewer_index.status = htobe32(VIEWER_INDEX_HUP); - } + PERROR("Relay reading index file %d", + vstream->index_read_fd); + viewer_index.status = htobe32(VIEWER_INDEX_ERR); } goto send_reply; } else { diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 8039a7ec7..0af8e0f1d 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -166,6 +166,11 @@ struct relay_viewer_stream { struct lttng_ht_node_u64 stream_n; struct rcu_head rcu_node; struct ctf_trace *ctf_trace; + /* + * This lock blocks only when the writer is about to start overwriting + * a file currently read by the reader. + */ + pthread_mutex_t overwrite_lock; /* Information telling us if the stream is a metadata stream. */ unsigned int metadata_flag:1; /* diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 5782175e6..70a1948c3 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -849,7 +849,10 @@ static void destroy_stream(struct relay_stream *stream) * lookup failure on the live thread side of a stream indicates * that the viewer stream index received value should be used. */ + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); vstream->total_index_received = stream->total_index_received; + vstream->close_write_flag = 1; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); } /* Cleanup index of that stream. */ @@ -2065,18 +2068,9 @@ int relay_process_data(struct relay_command *cmd) * currently using and let it handle the fault. */ if (vstream->tracefile_count_current == new_id) { + pthread_mutex_lock(&vstream->overwrite_lock); vstream->abort_flag = 1; - vstream->close_write_flag = 1; - - ret = close(vstream->read_fd); - if (ret < 0) { - PERROR("close index"); - } - - ret = close(vstream->index_read_fd); - if (ret < 0) { - PERROR("close tracefile"); - } + pthread_mutex_unlock(&vstream->overwrite_lock); DBG("Streaming side setting abort_flag on stream %s_%lu\n", stream->channel_name, new_id); } else if (vstream->tracefile_count_current == @@ -2094,6 +2088,7 @@ int relay_process_data(struct relay_command *cmd) stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); + stream->total_index_received = 0; pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file"); diff --git a/tests/fast_regression b/tests/fast_regression index 678570c86..6c9fa3986 100644 --- a/tests/fast_regression +++ b/tests/fast_regression @@ -3,6 +3,7 @@ regression/tools/filtering/test_unsupported_op regression/tools/filtering/test_valid_filter regression/tools/streaming/test_ust regression/tools/live/test_ust +regression/tools/live/test_ust_tracefile_count regression/tools/live/test_lttng_ust regression/tools/tracefile-limits/test_tracefile_count regression/tools/tracefile-limits/test_tracefile_size diff --git a/tests/regression/tools/live/Makefile.am b/tests/regression/tools/live/Makefile.am index d8a952524..fdf9d3ade 100644 --- a/tests/regression/tools/live/Makefile.am +++ b/tests/regression/tools/live/Makefile.am @@ -25,7 +25,7 @@ noinst_PROGRAMS = live_test EXTRA_DIST = live_test test_kernel if HAVE_LIBLTTNG_UST_CTL -EXTRA_DIST += test_ust +EXTRA_DIST += test_ust test_ust_tracefile_count endif live_test_SOURCES = live_test.c diff --git a/tests/regression/tools/live/test_ust_tracefile_count b/tests/regression/tools/live/test_ust_tracefile_count new file mode 100755 index 000000000..464f4aacb --- /dev/null +++ b/tests/regression/tools/live/test_ust_tracefile_count @@ -0,0 +1,122 @@ +#!/bin/bash +# +# Copyright (C) - 2013 Julien Desfossez +# David Goulet +# +# This library is free software; you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +TEST_DESC="Live - User space tracing with tracefile_count" + +CURDIR=$(dirname $0)/ +TESTDIR=$CURDIR/../../../ +SESSIOND_BIN="lttng-sessiond" +RELAYD_BIN="lttng-relayd" +LTTNG_BIN="lttng" +BABELTRACE_BIN="babeltrace" +NR_ITER=1 +NR_USEC_WAIT=1 +DELAY_USEC=2000000 +TESTAPP_PATH="$TESTDIR/utils/testapp" +TESTAPP_NAME="gen-ust-events" +TESTAPP_BIN="$TESTAPP_PATH/$TESTAPP_NAME/$TESTAPP_NAME" + +SESSION_NAME="live" +EVENT_NAME="tp:tptest" + +TRACE_PATH=$(mktemp -d) + +DIR=$(readlink -f $TESTDIR) + +source $TESTDIR/utils/utils.sh + +echo "$TEST_DESC" + +function setup_live_tracing() +{ + # Create session with default path + $TESTDIR/../src/bin/lttng/$LTTNG_BIN create $SESSION_NAME --live $DELAY_USEC \ + -U net://localhost >/dev/null 2>&1 + + $TESTDIR/../src/bin/lttng/$LTTNG_BIN enable-channel --subbuf-size 16k -C 16k -W 10 -u chan1 + $TESTDIR/../src/bin/lttng/$LTTNG_BIN enable-event "$EVENT_NAME" -s $SESSION_NAME -u -c chan1 >/dev/null 2>&1 + $TESTDIR/../src/bin/lttng/$LTTNG_BIN start $SESSION_NAME >/dev/null 2>&1 +} + +function clean_live_tracing() +{ + $TESTDIR/../src/bin/lttng/$LTTNG_BIN stop $SESSION_NAME >/dev/null 2>&1 + $TESTDIR/../src/bin/lttng/$LTTNG_BIN destroy $SESSION_NAME >/dev/null 2>&1 + rm -rf $TRACE_PATH +} + +if [ -z $(pidof lt-$SESSIOND_BIN) ]; then + $DIR/../src/bin/lttng-sessiond/$SESSIOND_BIN --daemonize --quiet --consumerd32-path="$DIR/../src/bin/lttng-consumerd/lttng-consumerd" --consumerd64-path="$DIR/../src/bin/lttng-consumerd/lttng-consumerd" + if [ $? -eq 1 ]; then + echo "Fail to start lttng-sessiond" + exit 1 + fi + # Wait for sessiond to bootstrap + sleep 2 +fi + +opt="-o $TRACE_PATH" +if [ -z $(pidof lt-$RELAYD_BIN) ]; then + $DIR/../src/bin/lttng-relayd/$RELAYD_BIN $opt >/dev/null 2>&1 & + if [ $? -eq 1 ]; then + echo "Fail to start lttng-relayd (opt: $opt)" + return 1 + fi +fi + +setup_live_tracing + +# Run app in background +$TESTAPP_BIN $NR_ITER $NR_USEC_WAIT >/dev/null 2>&1 & +# Wait for app to complete +while [ -n "$(pidof $TESTAPP_NAME)" ]; do + sleep 0.5 +done + +# Start the live test +$TESTDIR/regression/tools/live/live_test + +clean_live_tracing + +# Kill the relayd +PID_RELAYD=`pidof lt-$RELAYD_BIN` +kill $PID_RELAYD >/dev/null 2>&1 +if [ $? -eq 1 ]; then + echo "Kill lttng-relayd (pid: $PID_RELAYD)" + exit 1 +else + out=1 + while [ -n "$out" ]; do + out=$(pidof lt-$RELAYD_BIN) + sleep 0.5 + done +fi + +# Kill the sessiond +PID_SESSIOND=`pidof lt-$SESSIOND_BIN` +kill $PID_SESSIOND >/dev/null 2>&1 +if [ $? -eq 1 ]; then + echo "Kill sessiond daemon" + exit 1 +else + out=1 + while [ -n "$out" ]; do + out=$(pidof lt-$SESSIOND_BIN) + sleep 0.5 + done +fi -- 2.34.1