import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem;
import org.eclipse.tracecompass.statesystem.core.ITmfStateSystemBuilder;
import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
*/
public abstract class AbstractTmfStateProvider implements ITmfStateProvider {
- private static final int DEFAULT_EVENTS_QUEUE_SIZE = 10000;
+ private static final int DEFAULT_EVENTS_QUEUE_SIZE = 127;
+ private static final int DEFAULT_EVENTS_CHUNK_SIZE = 127;
private final ITmfTrace fTrace;
- private final BlockingQueue<ITmfEvent> fEventsQueue;
+ private final BufferedBlockingQueue<ITmfEvent> fEventsQueue;
private final Thread fEventHandlerThread;
private boolean fStateSystemAssigned;
*/
public AbstractTmfStateProvider(ITmfTrace trace, String id) {
fTrace = trace;
- fEventsQueue = new ArrayBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE);
+ fEventsQueue = new BufferedBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE, DEFAULT_EVENTS_CHUNK_SIZE);
fStateSystemAssigned = false;
fEventHandlerThread = new Thread(new EventProcessor(), id + " Event Handler"); //$NON-NLS-1$
/* Insert a null event in the queue to stop the event handler's thread. */
try {
fEventsQueue.put(END_EVENT);
+ fEventsQueue.flushInputBuffer();
fEventHandlerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
/* Insert the event we're received into the events queue */
ITmfEvent curEvent = event;
- try {
- fEventsQueue.put(curEvent);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ fEventsQueue.put(curEvent);
}
/**
*/
try {
fEventsQueue.put(EMPTY_QUEUE_EVENT);
+ fEventsQueue.flushInputBuffer();
while (!fEventsQueue.isEmpty()) {
Thread.sleep(100);
}
System.err.println("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$
return;
}
- @NonNull ITmfEvent event;
- try {
- /*
- * We never insert null in the queue. Cannot be checked at
- * compile-time until Java 8 annotations...
- */
- event = checkNotNull(fEventsQueue.take());
- /* This is a singleton, we want to do != instead of !x.equals */
- while (event != END_EVENT) {
- if (event == EMPTY_QUEUE_EVENT) {
- /* Synchronization event, should be ignored */
- event = checkNotNull(fEventsQueue.take());
- continue;
- }
- currentEvent = event;
- eventHandle(event);
+
+ /*
+ * We never insert null in the queue. Cannot be checked at
+ * compile-time until Java 8 annotations...
+ */
+ @NonNull ITmfEvent event = checkNotNull(fEventsQueue.take());
+ /* This is a singleton, we want to do != instead of !x.equals */
+ while (event != END_EVENT) {
+ if (event == EMPTY_QUEUE_EVENT) {
+ /* Synchronization event, should be ignored */
event = checkNotNull(fEventsQueue.take());
+ continue;
}
- /* We've received the last event, clean up */
- closeStateSystem();
-
- } catch (InterruptedException e) {
- /* We've been interrupted abnormally */
- System.out.println("Event handler interrupted!"); //$NON-NLS-1$
- e.printStackTrace();
+ currentEvent = event;
+ eventHandle(event);
+ event = checkNotNull(fEventsQueue.take());
}
+ /* We've received the last event, clean up */
+ closeStateSystem();
}
private void closeStateSystem() {