From: Matthew Khouzam Date: Wed, 22 Apr 2015 18:37:58 +0000 (-0400) Subject: tmf.core: move AbstractTmfStateProvider to new BufferedBlockingQueue X-Git-Url: http://git.efficios.com/?a=commitdiff_plain;h=423cf6a48470379c8fb29ffb75413594bfa06fa9;p=deliverable%2Ftracecompass.git tmf.core: move AbstractTmfStateProvider to new BufferedBlockingQueue This yeilds a performance boost, especially on slower io systems. Change-Id: I04dd89d2237c80bd07c00514fd83f0d4f31e13dc Signed-off-by: Matthew Khouzam Reviewed-on: https://git.eclipse.org/r/46279 Reviewed-by: Hudson CI Reviewed-by: Alexandre Montplaisir Tested-by: Alexandre Montplaisir --- diff --git a/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/AbstractTmfStateProvider.java b/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/AbstractTmfStateProvider.java index 3783450d23..a4bf28a2b2 100644 --- a/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/AbstractTmfStateProvider.java +++ b/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/AbstractTmfStateProvider.java @@ -14,11 +14,9 @@ package org.eclipse.tracecompass.tmf.core.statesystem; import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; - import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem; import org.eclipse.tracecompass.statesystem.core.ITmfStateSystemBuilder; import org.eclipse.tracecompass.tmf.core.event.ITmfEvent; @@ -41,10 +39,11 @@ import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace; */ public abstract class AbstractTmfStateProvider implements ITmfStateProvider { - private static final int DEFAULT_EVENTS_QUEUE_SIZE = 10000; + private static final int DEFAULT_EVENTS_QUEUE_SIZE = 127; + private static final int DEFAULT_EVENTS_CHUNK_SIZE = 127; private final ITmfTrace fTrace; - private final BlockingQueue fEventsQueue; + private final BufferedBlockingQueue fEventsQueue; private final Thread fEventHandlerThread; private boolean fStateSystemAssigned; @@ -62,7 +61,7 @@ public abstract class AbstractTmfStateProvider implements ITmfStateProvider { */ public AbstractTmfStateProvider(ITmfTrace trace, String id) { fTrace = trace; - fEventsQueue = new ArrayBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE); + fEventsQueue = new BufferedBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE, DEFAULT_EVENTS_CHUNK_SIZE); fStateSystemAssigned = false; fEventHandlerThread = new Thread(new EventProcessor(), id + " Event Handler"); //$NON-NLS-1$ @@ -104,6 +103,7 @@ public abstract class AbstractTmfStateProvider implements ITmfStateProvider { /* Insert a null event in the queue to stop the event handler's thread. */ try { fEventsQueue.put(END_EVENT); + fEventsQueue.flushInputBuffer(); fEventHandlerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); @@ -122,11 +122,7 @@ public abstract class AbstractTmfStateProvider implements ITmfStateProvider { /* Insert the event we're received into the events queue */ ITmfEvent curEvent = event; - try { - fEventsQueue.put(curEvent); - } catch (InterruptedException e) { - e.printStackTrace(); - } + fEventsQueue.put(curEvent); } /** @@ -140,6 +136,7 @@ public abstract class AbstractTmfStateProvider implements ITmfStateProvider { */ try { fEventsQueue.put(EMPTY_QUEUE_EVENT); + fEventsQueue.flushInputBuffer(); while (!fEventsQueue.isEmpty()) { Thread.sleep(100); } @@ -187,33 +184,26 @@ public abstract class AbstractTmfStateProvider implements ITmfStateProvider { System.err.println("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$ return; } - @NonNull ITmfEvent event; - try { - /* - * We never insert null in the queue. Cannot be checked at - * compile-time until Java 8 annotations... - */ - event = checkNotNull(fEventsQueue.take()); - /* This is a singleton, we want to do != instead of !x.equals */ - while (event != END_EVENT) { - if (event == EMPTY_QUEUE_EVENT) { - /* Synchronization event, should be ignored */ - event = checkNotNull(fEventsQueue.take()); - continue; - } - currentEvent = event; - eventHandle(event); + + /* + * We never insert null in the queue. Cannot be checked at + * compile-time until Java 8 annotations... + */ + @NonNull ITmfEvent event = checkNotNull(fEventsQueue.take()); + /* This is a singleton, we want to do != instead of !x.equals */ + while (event != END_EVENT) { + if (event == EMPTY_QUEUE_EVENT) { + /* Synchronization event, should be ignored */ event = checkNotNull(fEventsQueue.take()); + continue; } - /* We've received the last event, clean up */ - closeStateSystem(); - - } catch (InterruptedException e) { - /* We've been interrupted abnormally */ - System.out.println("Event handler interrupted!"); //$NON-NLS-1$ - e.printStackTrace(); + currentEvent = event; + eventHandle(event); + event = checkNotNull(fEventsQueue.take()); } + /* We've received the last event, clean up */ + closeStateSystem(); } private void closeStateSystem() {