Fix concurrency issues while overwriting tracefiles in live
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 14 Nov 2013 17:51:30 +0000 (12:51 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 14 Nov 2013 17:55:29 +0000 (12:55 -0500)
Introduce a new lock to set the abort_flag in the viewer stream. This
lock is nested inside the tracefile rotation of the writer and is
taken before reading the tracefile in the viewer-side. That way, the
viewer-side has just to check this flag to know if it is safe to read.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c
tests/fast_regression
tests/regression/tools/live/Makefile.am
tests/regression/tools/live/test_ust_tracefile_count [new file with mode: 0755]

index e51ff5cdf53495d4848a2867cfc952e9f2534bc5..480c459ce6a49724f54d1b6ef71c0b5a6d458a1b 100644 (file)
@@ -801,23 +801,34 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream,
                }
        }
        viewer_stream->tracefile_count_current = tracefile_id;
-       pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
 
        if (viewer_stream->abort_flag == 0) {
-               ret = close(viewer_stream->index_read_fd);
-               if (ret < 0) {
-                       PERROR("close index file");
+               if (viewer_stream->index_read_fd > 0) {
+                       ret = close(viewer_stream->index_read_fd);
+                       if (ret < 0) {
+                               PERROR("close index file %d",
+                                               viewer_stream->index_read_fd);
+                       }
+                       viewer_stream->index_read_fd = -1;
                }
-               ret = close(viewer_stream->read_fd);
-               if (ret < 0) {
-                       PERROR("close tracefile");
+               if (viewer_stream->read_fd > 0) {
+                       ret = close(viewer_stream->read_fd);
+                       if (ret < 0) {
+                               PERROR("close tracefile %d",
+                                               viewer_stream->read_fd);
+                       }
+                       viewer_stream->read_fd = -1;
                }
        } else {
                viewer_stream->abort_flag = 0;
        }
 
+       viewer_stream->index_read_fd = -1;
        viewer_stream->read_fd = -1;
 
+       if (stream) {
+               pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+       }
        ret = open_index(viewer_stream);
        if (ret < 0) {
                goto error;
@@ -1125,25 +1136,26 @@ int viewer_get_next_index(struct relay_command *cmd,
                                goto end_unlock;
                        }
                }
-               if (rstream->beacon_ts_end != -1ULL &&
-                               vstream->last_sent_index == rstream->total_index_received) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
-                       viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
-                       goto send_reply;
-               }
-               /*
-                * Reader and writer are working in the same tracefile, so we care
-                * about the number of index received and sent. Otherwise, we read
-                * up to EOF.
-                */
                pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
-               if (rstream->tracefile_count_current == vstream->tracefile_count_current
-                               && rstream->total_index_received <= vstream->last_sent_index
-                               && !vstream->close_write_flag) {
-                       pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
-                       /* No new index to send, retry later. */
-                       viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
-                       goto send_reply;
+               if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
+                       if (rstream->beacon_ts_end != -1ULL &&
+                               vstream->last_sent_index == rstream->total_index_received) {
+                               viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+                               viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
+                               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+                               goto send_reply;
+                       /*
+                        * Reader and writer are working in the same tracefile, so we care
+                        * about the number of index received and sent. Otherwise, we read
+                        * up to EOF.
+                        */
+                       } else if (rstream->total_index_received <= vstream->last_sent_index
+                                       && !vstream->close_write_flag) {
+                               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+                               /* No new index to send, retry later. */
+                               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                               goto send_reply;
+                       }
                }
                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
        } else if (!rstream && vstream->close_write_flag &&
@@ -1161,8 +1173,23 @@ int viewer_get_next_index(struct relay_command *cmd,
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
+       pthread_mutex_lock(&vstream->overwrite_lock);
+       if (vstream->abort_flag) {
+               /*
+                * The file is being overwritten by the writer, we cannot
+                * use it.
+                */
+               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+               pthread_mutex_unlock(&vstream->overwrite_lock);
+               ret = rotate_viewer_stream(vstream, rstream);
+               if (ret < 0) {
+                       goto end_unlock;
+               }
+               goto send_reply;
+       }
        ret = lttng_read(vstream->index_read_fd, &packet_index,
                        sizeof(packet_index));
+       pthread_mutex_unlock(&vstream->overwrite_lock);
        if (ret < sizeof(packet_index)) {
                /*
                 * The tracefile is closed in write, so we read up to EOF.
@@ -1175,17 +1202,9 @@ int viewer_get_next_index(struct relay_command *cmd,
                                goto end_unlock;
                        }
                } else {
-                       /*
-                        * If the read fd was closed by the streaming side, the
-                        * abort_flag will be set to 1, otherwise it is an error.
-                        */
-                       if (vstream->abort_flag != 1) {
-                               PERROR("Relay reading index file");
-                               viewer_index.status = htobe32(VIEWER_INDEX_ERR);
-                               goto send_reply;
-                       } else {
-                               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
-                       }
+                       PERROR("Relay reading index file %d",
+                                       vstream->index_read_fd);
+                       viewer_index.status = htobe32(VIEWER_INDEX_ERR);
                }
                goto send_reply;
        } else {
index 8039a7ec703d01f4672d32abcc9448ed49fb017a..0af8e0f1d963199b5a6bb82c28b244a33daa6b6c 100644 (file)
@@ -166,6 +166,11 @@ struct relay_viewer_stream {
        struct lttng_ht_node_u64 stream_n;
        struct rcu_head rcu_node;
        struct ctf_trace *ctf_trace;
+       /*
+        * This lock blocks only when the writer is about to start overwriting
+        * a file currently read by the reader.
+        */
+       pthread_mutex_t overwrite_lock;
        /* Information telling us if the stream is a metadata stream. */
        unsigned int metadata_flag:1;
        /*
index 5782175e6e7f15b57753cc29dedebd54db7a51da..70a1948c328cc690e39e4add4d67f4a2a8f2fe44 100644 (file)
@@ -849,7 +849,10 @@ static void destroy_stream(struct relay_stream *stream)
                 * lookup failure on the live thread side of a stream indicates
                 * that the viewer stream index received value should be used.
                 */
+               pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
                vstream->total_index_received = stream->total_index_received;
+               vstream->close_write_flag = 1;
+               pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
        }
 
        /* Cleanup index of that stream. */
@@ -2065,18 +2068,9 @@ int relay_process_data(struct relay_command *cmd)
                         * currently using and let it handle the fault.
                         */
                        if (vstream->tracefile_count_current == new_id) {
+                               pthread_mutex_lock(&vstream->overwrite_lock);
                                vstream->abort_flag = 1;
-                               vstream->close_write_flag = 1;
-
-                               ret = close(vstream->read_fd);
-                               if (ret < 0) {
-                                       PERROR("close index");
-                               }
-
-                               ret = close(vstream->index_read_fd);
-                               if (ret < 0) {
-                                       PERROR("close tracefile");
-                               }
+                               pthread_mutex_unlock(&vstream->overwrite_lock);
                                DBG("Streaming side setting abort_flag on stream %s_%lu\n",
                                                stream->channel_name, new_id);
                        } else if (vstream->tracefile_count_current ==
@@ -2094,6 +2088,7 @@ int relay_process_data(struct relay_command *cmd)
                                stream->tracefile_size, stream->tracefile_count,
                                relayd_uid, relayd_gid, stream->fd,
                                &(stream->tracefile_count_current), &stream->fd);
+               stream->total_index_received = 0;
                pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
                if (ret < 0) {
                        ERR("Rotating stream output file");
index 678570c86fe7d193a967ec1654bd4e31e98dd9ee..6c9fa3986b58e36f0979a4199acecb5dac5130bc 100644 (file)
@@ -3,6 +3,7 @@ regression/tools/filtering/test_unsupported_op
 regression/tools/filtering/test_valid_filter
 regression/tools/streaming/test_ust
 regression/tools/live/test_ust
+regression/tools/live/test_ust_tracefile_count
 regression/tools/live/test_lttng_ust
 regression/tools/tracefile-limits/test_tracefile_count
 regression/tools/tracefile-limits/test_tracefile_size
index d8a952524a838696ac19aa2547513cff33b023e3..fdf9d3adee53a6b65b956b65bb31ba31e0bae7eb 100644 (file)
@@ -25,7 +25,7 @@ noinst_PROGRAMS = live_test
 EXTRA_DIST = live_test test_kernel
 
 if HAVE_LIBLTTNG_UST_CTL
-EXTRA_DIST += test_ust
+EXTRA_DIST += test_ust test_ust_tracefile_count
 endif
 
 live_test_SOURCES = live_test.c
diff --git a/tests/regression/tools/live/test_ust_tracefile_count b/tests/regression/tools/live/test_ust_tracefile_count
new file mode 100755 (executable)
index 0000000..464f4aa
--- /dev/null
@@ -0,0 +1,122 @@
+#!/bin/bash
+#
+# Copyright (C) - 2013 Julien Desfossez <julien.desfossez@efficios.com>
+#                      David Goulet <dgoulet@efficios.com>
+#
+# This library is free software; you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+
+TEST_DESC="Live - User space tracing with tracefile_count"
+
+CURDIR=$(dirname $0)/
+TESTDIR=$CURDIR/../../../
+SESSIOND_BIN="lttng-sessiond"
+RELAYD_BIN="lttng-relayd"
+LTTNG_BIN="lttng"
+BABELTRACE_BIN="babeltrace"
+NR_ITER=1
+NR_USEC_WAIT=1
+DELAY_USEC=2000000
+TESTAPP_PATH="$TESTDIR/utils/testapp"
+TESTAPP_NAME="gen-ust-events"
+TESTAPP_BIN="$TESTAPP_PATH/$TESTAPP_NAME/$TESTAPP_NAME"
+
+SESSION_NAME="live"
+EVENT_NAME="tp:tptest"
+
+TRACE_PATH=$(mktemp -d)
+
+DIR=$(readlink -f $TESTDIR)
+
+source $TESTDIR/utils/utils.sh
+
+echo "$TEST_DESC"
+
+function setup_live_tracing()
+{
+       # Create session with default path
+       $TESTDIR/../src/bin/lttng/$LTTNG_BIN create $SESSION_NAME --live $DELAY_USEC \
+               -U net://localhost >/dev/null 2>&1
+
+       $TESTDIR/../src/bin/lttng/$LTTNG_BIN enable-channel --subbuf-size 16k -C 16k -W 10 -u chan1
+       $TESTDIR/../src/bin/lttng/$LTTNG_BIN enable-event "$EVENT_NAME" -s $SESSION_NAME -u -c chan1 >/dev/null 2>&1
+       $TESTDIR/../src/bin/lttng/$LTTNG_BIN start $SESSION_NAME >/dev/null 2>&1
+}
+
+function clean_live_tracing()
+{
+       $TESTDIR/../src/bin/lttng/$LTTNG_BIN stop $SESSION_NAME >/dev/null 2>&1
+       $TESTDIR/../src/bin/lttng/$LTTNG_BIN destroy $SESSION_NAME >/dev/null 2>&1
+       rm -rf $TRACE_PATH
+}
+
+if [ -z $(pidof lt-$SESSIOND_BIN) ]; then
+       $DIR/../src/bin/lttng-sessiond/$SESSIOND_BIN --daemonize --quiet --consumerd32-path="$DIR/../src/bin/lttng-consumerd/lttng-consumerd" --consumerd64-path="$DIR/../src/bin/lttng-consumerd/lttng-consumerd"
+       if [ $? -eq 1 ]; then
+               echo "Fail to start lttng-sessiond"
+               exit 1
+       fi
+       # Wait for sessiond to bootstrap
+       sleep 2
+fi
+
+opt="-o $TRACE_PATH"
+if [ -z $(pidof lt-$RELAYD_BIN) ]; then
+       $DIR/../src/bin/lttng-relayd/$RELAYD_BIN $opt >/dev/null 2>&1 &
+       if [ $? -eq 1 ]; then
+               echo "Fail to start lttng-relayd (opt: $opt)"
+               return 1
+       fi
+fi
+
+setup_live_tracing
+
+# Run app in background
+$TESTAPP_BIN $NR_ITER $NR_USEC_WAIT >/dev/null 2>&1 &
+# Wait for app to complete
+while [ -n "$(pidof $TESTAPP_NAME)" ]; do
+       sleep 0.5
+done
+
+# Start the live test
+$TESTDIR/regression/tools/live/live_test
+
+clean_live_tracing
+
+# Kill the relayd
+PID_RELAYD=`pidof lt-$RELAYD_BIN`
+kill $PID_RELAYD >/dev/null 2>&1
+if [ $? -eq 1 ]; then
+       echo "Kill lttng-relayd (pid: $PID_RELAYD)"
+       exit 1
+else
+       out=1
+       while [ -n "$out" ]; do
+               out=$(pidof lt-$RELAYD_BIN)
+               sleep 0.5
+       done
+fi
+
+# Kill the sessiond
+PID_SESSIOND=`pidof lt-$SESSIOND_BIN`
+kill $PID_SESSIOND >/dev/null 2>&1
+if [ $? -eq 1 ]; then
+       echo "Kill sessiond daemon"
+       exit 1
+else
+       out=1
+       while [ -n "$out" ]; do
+               out=$(pidof lt-$SESSIOND_BIN)
+               sleep 0.5
+       done
+fi
This page took 0.032075 seconds and 5 git commands to generate.