tmf: Add a TmfSignalThrottler
authorAlexandre Montplaisir <alexmonthy@voxpopuli.im>
Tue, 26 Mar 2013 19:29:28 +0000 (15:29 -0400)
committerAlexandre Montplaisir <alexmonthy@voxpopuli.im>
Thu, 28 Mar 2013 14:56:33 +0000 (10:56 -0400)
Components can now decide to use a signal throttler for outgoing
signals. This offers a centralized way of controlling the rate
of signals (TimeRange sync signals come to mind).

Change-Id: Ia26cb12a87c6db2fae23892aff908715fe527755
Signed-off-by: Alexandre Montplaisir <alexmonthy@voxpopuli.im>
Reviewed-on: https://git.eclipse.org/r/11502
Tested-by: Hudson CI
Reviewed-by: Bernd Hufmann <bhufmann@gmail.com>
IP-Clean: Bernd Hufmann <bhufmann@gmail.com>
Tested-by: Bernd Hufmann <bhufmann@gmail.com>
org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/AllTmfCoreTests.java
org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/signal/AllTests.java [new file with mode: 0644]
org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/signal/TmfSignalThrottlerTest.java [new file with mode: 0644]
org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/signal/TmfSignalThrottler.java [new file with mode: 0644]

index d7e9ebe08edb9d90c72a410c0f9fd0d93526a103..7723e75a802de6f1aa44ea67af1640cc6d337820 100644 (file)
@@ -26,6 +26,7 @@ import org.junit.runners.Suite;
     org.eclipse.linuxtools.tmf.core.tests.ctfadaptor.AllTests.class,
     org.eclipse.linuxtools.tmf.core.tests.event.AllTests.class,
     org.eclipse.linuxtools.tmf.core.tests.request.AllTests.class,
+    org.eclipse.linuxtools.tmf.core.tests.signal.AllTests.class,
     org.eclipse.linuxtools.tmf.core.tests.statesystem.AllTests.class,
     org.eclipse.linuxtools.tmf.core.tests.statistics.AllTests.class,
     org.eclipse.linuxtools.tmf.core.tests.trace.AllTests.class,
diff --git a/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/signal/AllTests.java b/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/signal/AllTests.java
new file mode 100644 (file)
index 0000000..54ad4b9
--- /dev/null
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Copyright (c) 2013 Ericsson
+ *
+ * All rights reserved. This program and the accompanying materials are
+ * made available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *   Alexandre Montplaisir - Initial API and implementation
+ *******************************************************************************/
+
+package org.eclipse.linuxtools.tmf.core.tests.signal;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Signal tests
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    TmfSignalThrottlerTest.class
+})
+public class AllTests {
+
+}
diff --git a/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/signal/TmfSignalThrottlerTest.java b/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/signal/TmfSignalThrottlerTest.java
new file mode 100644 (file)
index 0000000..7f4f74d
--- /dev/null
@@ -0,0 +1,224 @@
+/*******************************************************************************
+ * Copyright (c) 2013 Ericsson
+ *
+ * All rights reserved. This program and the accompanying materials are
+ * made available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *   Alexandre Montplaisir - Initial API and implementation
+ *******************************************************************************/
+
+package org.eclipse.linuxtools.tmf.core.tests.signal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.eclipse.linuxtools.tmf.core.component.TmfComponent;
+import org.eclipse.linuxtools.tmf.core.signal.TmfSignal;
+import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
+import org.eclipse.linuxtools.tmf.core.signal.TmfSignalThrottler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link TmfSignalThrottler}
+ *
+ * @author Alexandre Montplaisir
+ */
+public class TmfSignalThrottlerTest {
+
+    private MySender sender;
+    private MyListener listener;
+
+    /**
+     * Pre-test setup
+     */
+    @Before
+    public void setUp() {
+        sender = new MySender();
+        listener = new MyListener();
+    }
+
+    /**
+     * After-test cleanup
+     */
+    @After
+    public void tearDown() {
+        sender.dispose();
+        listener.dispose();
+    }
+
+    // ------------------------------------------------------------------------
+    // Test cases
+    // ------------------------------------------------------------------------
+
+    /**
+     * Test using only one throttler. Only one signal should go through.
+     */
+    @Test
+    public void testOneChannel() {
+        final MySignal sig1 = new MySignal(sender, 0);
+        final MySignal sig2 = new MySignal(sender, 0);
+        final MySignal sig3 = new MySignal(sender, 0);
+
+        synchronized(this) {
+            sender.sendSignal(sig1);
+            sender.sendSignal(sig2);
+            sender.sendSignal(sig3);
+        }
+
+        sleep(1000);
+
+        assertEquals(1, listener.nbReceived[0]);
+        assertEquals(0, listener.nbReceived[1]);
+        assertEquals(0, listener.nbReceived[2]);
+    }
+
+    /**
+     * Test using multiple throttlers in parrallel. Only one signal per
+     * throttler should go through.
+     */
+    @Test
+    public void testMultipleChannels() {
+        List<MySignal> signals = new ArrayList<MySignal>();
+        signals.add(new MySignal(sender, 0));
+        signals.add(new MySignal(sender, 0));
+        signals.add(new MySignal(sender, 0));
+
+        signals.add(new MySignal(sender, 1));
+        signals.add(new MySignal(sender, 1));
+        signals.add(new MySignal(sender, 1));
+
+        signals.add(new MySignal(sender, 2));
+        signals.add(new MySignal(sender, 2));
+        signals.add(new MySignal(sender, 2));
+
+        Collections.shuffle(signals); /* Every day */
+
+        synchronized(this) {
+            for (MySignal sig : signals) {
+                sender.sendSignal(sig);
+            }
+        }
+
+        sleep(2000);
+
+        for (int nb : listener.nbReceived) {
+            assertEquals(1, nb);
+        }
+    }
+
+    /**
+     * Test with one throttler, sending signals slowly. All three signals should
+     * go through.
+     */
+    @Test
+    public void testDelay() {
+        final MySignal sig1 = new MySignal(sender, 0);
+        final MySignal sig2 = new MySignal(sender, 0);
+        final MySignal sig3 = new MySignal(sender, 0);
+
+        sender.sendSignal(sig1);
+        sleep(1000);
+        sender.sendSignal(sig2);
+        sleep(1000);
+        sender.sendSignal(sig3);
+        sleep(1000);
+
+        assertEquals(3, listener.nbReceived[0]);
+    }
+
+    // ------------------------------------------------------------------------
+    // Helper methods
+    // ------------------------------------------------------------------------
+
+    private static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    // Helper classes
+    // ------------------------------------------------------------------------
+
+    /**
+     * Signal sender
+     */
+    private class MySender extends TmfComponent {
+
+        private final TmfSignalThrottler[] throttlers;
+
+        MySender() {
+            super("MySender");
+            throttlers = new TmfSignalThrottler[] {
+                    new TmfSignalThrottler(this,  200),
+                    new TmfSignalThrottler(this,  500),
+                    new TmfSignalThrottler(this, 1000),
+            };
+        }
+
+        void sendSignal(MySignal signal) {
+            throttlers[signal.getChannel()].queue(signal);
+        }
+
+        @Override
+        public void dispose() {
+            super.dispose();
+            for (TmfSignalThrottler elem : throttlers) {
+                elem.dispose();
+            }
+        }
+    }
+
+    /**
+     * Signal listener
+     */
+    public class MyListener extends TmfComponent {
+
+        int[] nbReceived = { 0, 0, 0 };
+
+        /**
+         * Constructor. Needs to be public so TmfSignalHandler can see it.
+         */
+        public MyListener() {
+            super("MyListener");
+        }
+
+        /**
+         * Receive a signal.
+         *
+         * @param sig
+         *            Signal received
+         */
+        @TmfSignalHandler
+        public void receiveSignal(final MySignal sig) {
+            nbReceived[sig.getChannel()]++;
+        }
+    }
+
+    /**
+     * Signal object
+     */
+    private class MySignal extends TmfSignal {
+
+        private final int channel;
+
+        public MySignal(MySender source, int channel) {
+            super(source);
+            this.channel = channel;
+        }
+
+        public int getChannel() {
+            return channel;
+        }
+    }
+}
diff --git a/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/signal/TmfSignalThrottler.java b/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/signal/TmfSignalThrottler.java
new file mode 100644 (file)
index 0000000..c5c79ec
--- /dev/null
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright (c) 2013 Ericsson
+ *
+ * All rights reserved. This program and the accompanying materials are
+ * made available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *   Alexandre Montplaisir - Initial API and implementation
+ *******************************************************************************/
+
+package org.eclipse.linuxtools.tmf.core.signal;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.eclipse.linuxtools.tmf.core.component.ITmfComponent;
+import org.eclipse.linuxtools.tmf.core.component.TmfComponent;
+
+/**
+ * "Buffer" between a TmfComponent and the signal manager. You can use this if
+ * you want to throttle the amount of signals your component will send.
+ *
+ * It works by specifying a delay, then calling {@link #queue}. The signals will
+ * only be really sent if no other call to {@link #queue} happens within $delay
+ * milliseconds afterwards. This guarantees that only the *last* signal is
+ * actually broadcasted.
+ *
+ * Note that this class does not discriminate for signal types, sources, or
+ * whatever. If you want to throttle different signals in different ways, you
+ * can use multiple signal throttlers in your component and call them
+ * accordingly.
+ *
+ * @author Alexandre Montplaisir
+ * @since 2.0
+ */
+public class TmfSignalThrottler {
+
+    private final ITmfComponent fComponent;
+    private final long fDelay;
+    private final Timer fTimer;
+    private TimerTask fCurrentTask;
+
+    /**
+     * Constructor
+     *
+     * @param component
+     *            The source component of the signals
+     * @param delay
+     *            Time to wait before actually sending signals (in ms)
+     */
+    public TmfSignalThrottler(ITmfComponent component, long delay) {
+        this.fComponent = component;
+        this.fDelay = delay;
+        this.fTimer = new Timer();
+
+        /*
+         * Initialize currentTask to something, so we don't have to do a null
+         * check every time
+         */
+        fCurrentTask = new TimerTask() { @Override public void run() {} };
+    }
+
+    /**
+     * Queue a signal for sending. It will only be forward to the centralized
+     * signal handler if 'delay' elapses without another signal being sent
+     * through this method.
+     *
+     * You call this instead of calling {@link TmfComponent#broadcast}.
+     *
+     * @param signal
+     *            The signal to queue for broadcasting
+     */
+    public synchronized void queue(TmfSignal signal) {
+        fCurrentTask.cancel();
+        fCurrentTask = new BroadcastRequest(signal);
+        fTimer.schedule(fCurrentTask, fDelay);
+    }
+
+    /**
+     * Dispose method. Will prevent any pending signal from being sent, and this
+     * throttler from be used again.
+     */
+    public synchronized void dispose() {
+        fTimer.cancel();
+        fTimer.purge();
+    }
+
+    private class BroadcastRequest extends TimerTask {
+
+        private final TmfSignal signal;
+
+        BroadcastRequest(TmfSignal signal) {
+            this.signal = signal;
+        }
+
+        @Override
+        public void run() {
+            fComponent.broadcast(signal);
+        }
+    }
+}
This page took 0.030681 seconds and 5 git commands to generate.