1 /*******************************************************************************
2 * Copyright (c) 2012, 2015 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Alexandre Montplaisir - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.tracecompass
.tmf
.core
.statesystem
;
15 import org
.eclipse
.core
.runtime
.ISafeRunnable
;
16 import org
.eclipse
.core
.runtime
.SafeRunner
;
17 import org
.eclipse
.jdt
.annotation
.Nullable
;
18 import org
.eclipse
.tracecompass
.common
.core
.collect
.BufferedBlockingQueue
;
19 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.Activator
;
20 import org
.eclipse
.tracecompass
.statesystem
.core
.ITmfStateSystem
;
21 import org
.eclipse
.tracecompass
.statesystem
.core
.ITmfStateSystemBuilder
;
22 import org
.eclipse
.tracecompass
.tmf
.core
.event
.ITmfEvent
;
23 import org
.eclipse
.tracecompass
.tmf
.core
.event
.TmfEvent
;
24 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfContext
;
25 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfTrace
;
27 import com
.google
.common
.annotations
.VisibleForTesting
;
30 * Instead of using IStateChangeInput directly, one can extend this class, which
31 * defines a lot of the common functions of the state change input plugin.
33 * It will handle the state-system-processing in a separate thread, which is
34 * normally not a bad idea for traces of some size.
36 * processEvent() is replaced with eventHandle(), so that all the multi-thread
37 * logic is abstracted away.
39 * @author Alexandre Montplaisir
41 public abstract class AbstractTmfStateProvider
implements ITmfStateProvider
{
43 private static final int DEFAULT_EVENTS_QUEUE_SIZE
= 127;
44 private static final int DEFAULT_EVENTS_CHUNK_SIZE
= 127;
46 private final ITmfTrace fTrace
;
47 private final BufferedBlockingQueue
<ITmfEvent
> fEventsQueue
;
48 private final Thread fEventHandlerThread
;
50 private boolean fStateSystemAssigned
;
51 /** State system in which to insert the state changes */
52 private @Nullable ITmfStateSystemBuilder fSS
= null;
53 private @Nullable Throwable fFailureCause
= null;
55 /* The last safe time at which this state provider can be queried */
56 private volatile long fSafeTime
;
59 * An exception propagation runnable. If an exception occurred in Event
60 * Processor thread, this field should be updated so that the "main" thread
61 * will propagate the exception
63 private Runnable fPropagateExceptions
= () -> {
64 // Do nothing, a new Runnable will be defined if exceptions occur in the
70 * Instantiate a new state provider plugin.
73 * The LTTng 2.0 kernel trace directory
75 * Name given to this state change input. Only used internally.
77 public AbstractTmfStateProvider(ITmfTrace trace
, String id
) {
78 this(trace
, id
, DEFAULT_EVENTS_QUEUE_SIZE
, DEFAULT_EVENTS_CHUNK_SIZE
);
82 * Instantiate a new state provider. This constructor allows to fine-tune
83 * the size of the event processing queue. This can be useful to unit tests
89 * Name given to this state change input. Only used internally.
91 * The size of the queue, a.k.a the number of chunks that fit
92 * into the buffered queue.
94 * The number of events that fit inside a single chunk of the
99 protected AbstractTmfStateProvider(ITmfTrace trace
, String id
, int queueSize
, int chunkSize
) {
100 if (queueSize
<= 0 || chunkSize
<= 0) {
101 throw new IllegalArgumentException("Cannot have negative sized buffer" + //$NON-NLS-1$
102 formatError("queueSize", queueSize
) + //$NON-NLS-1$
103 formatError("chunkSize", chunkSize
)); //$NON-NLS-1$
106 fEventsQueue
= new BufferedBlockingQueue
<>(queueSize
, chunkSize
);
107 fStateSystemAssigned
= false;
108 // set the safe time to before the trace start, the analysis has not yet
110 fSafeTime
= trace
.getStartTime().toNanos() - 1;
111 fEventHandlerThread
= new Thread(() -> SafeRunner
.run(new EventProcessor()), id
+ " Event Handler"); //$NON-NLS-1$
114 private static String
formatError(String name
, int value
) {
115 return (value
<= 0) ?
" " + name
+ " = " + value
: ""; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
119 * Get the state system builder of this provider (to insert states in).
121 * @return The state system object to be filled
123 protected @Nullable ITmfStateSystemBuilder
getStateSystemBuilder() {
128 public ITmfTrace
getTrace() {
133 public long getStartTime() {
134 return fTrace
.getStartTime().toNanos();
141 public long getLatestSafeTime() {
146 public void assignTargetStateSystem(ITmfStateSystemBuilder ssb
) {
148 fStateSystemAssigned
= true;
149 fEventHandlerThread
.start();
153 public @Nullable ITmfStateSystem
getAssignedStateSystem() {
158 public void dispose() {
160 * Insert a null event in the queue to stop the event handler's thread.
163 fEventsQueue
.put(END_EVENT
);
164 fEventsQueue
.flushInputBuffer();
165 fEventHandlerThread
.join();
166 } catch (InterruptedException e
) {
167 Activator
.logError("Error disposing state provider", e
); //$NON-NLS-1$
169 fStateSystemAssigned
= false;
174 public void processEvent(ITmfEvent event
) {
175 /* Make sure the target state system has been assigned */
176 if (!fStateSystemAssigned
) {
177 throw new IllegalStateException("Cannot process event without a target state system. ID: " + getClass().getSimpleName()); //$NON-NLS-1$
179 fPropagateExceptions
.run();
181 /* Insert the event we're received into the events queue */
182 ITmfEvent curEvent
= event
;
183 fEventsQueue
.put(curEvent
);
190 public void fail(Throwable cause
) {
191 fFailureCause
= cause
;
198 public @Nullable Throwable
getFailureCause() {
199 return fFailureCause
;
203 * Block the caller until the events queue is empty.
205 public void waitForEmptyQueue() {
207 * We will first insert a dummy event that is guaranteed to not modify
208 * the state. That way, when that event leaves the queue, we will know
209 * for sure that the state system processed the preceding real event.
212 fEventsQueue
.put(EMPTY_QUEUE_EVENT
);
213 fEventsQueue
.flushInputBuffer();
214 while (!fEventsQueue
.isEmpty()) {
217 } catch (InterruptedException e
) {
222 // ------------------------------------------------------------------------
223 // Special event types
224 // ------------------------------------------------------------------------
227 * Fake event indicating the build is over, and the provider should close
229 private static class EndEvent
extends TmfEvent
{
231 super(null, ITmfContext
.UNKNOWN_RANK
, null, null, null);
235 /** Fake event indicating we want to clear the current queue */
236 private static class EmptyQueueEvent
extends TmfEvent
{
237 public EmptyQueueEvent() {
238 super(null, ITmfContext
.UNKNOWN_RANK
, null, null, null);
242 private static final EndEvent END_EVENT
= new EndEvent();
243 private static final EmptyQueueEvent EMPTY_QUEUE_EVENT
= new EmptyQueueEvent();
245 // ------------------------------------------------------------------------
247 // ------------------------------------------------------------------------
250 * This is the runner class for the second thread, which will take the
251 * events from the queue and pass them through the state system.
253 private class EventProcessor
implements ISafeRunnable
{
255 private @Nullable ITmfEvent currentEvent
;
256 private boolean fDone
= false;
260 if (!fStateSystemAssigned
) {
261 Activator
.logError("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$
266 * We never insert null in the queue. Cannot be checked at
267 * compile-time until Java 8 annotations...
269 ITmfEvent event
= fEventsQueue
.take();
270 /* This is a singleton, we want to do != instead of !x.equals */
271 while (event
!= END_EVENT
) {
272 if (event
== EMPTY_QUEUE_EVENT
) {
273 /* Synchronization event, should be ignored */
274 event
= fEventsQueue
.take();
277 currentEvent
= event
;
278 fSafeTime
= event
.getTimestamp().toNanos() - 1;
280 event
= fEventsQueue
.take();
283 /* We've received the last event, clean up */
288 private void closeStateSystem() {
289 ITmfEvent event
= currentEvent
;
290 final long endTime
= (event
== null) ?
0 : event
.getTimestamp().toNanos();
293 fSS
.closeHistory(endTime
);
298 public void handleException(@Nullable Throwable exception
) {
299 // Update the propagation runnable
300 final RuntimeException rException
= (exception
instanceof RuntimeException
) ?
(RuntimeException
) exception
: new RuntimeException("Error in threaded state history backend", exception
); //$NON-NLS-1$
302 fPropagateExceptions
= () -> {
303 // This exception should be caught by the thread that does the
304 // insertions and trigger the cancellation mechanism
309 * The last event was already processed, the exception was
310 * thrown from the closing of the state system, just return
315 ITmfEvent event
= fEventsQueue
.take();
316 while (event
!= END_EVENT
) {
317 if (event
== EMPTY_QUEUE_EVENT
) {
318 /* Synchronization event, should be ignored */
319 event
= fEventsQueue
.take();
322 event
= fEventsQueue
.take();
325 /* We've received the last event, clean up */
330 // ------------------------------------------------------------------------
332 // ------------------------------------------------------------------------
335 * Handle the given event and send the appropriate state transitions into
336 * the the state system.
338 * This is basically the same thing as IStateChangeInput.processEvent(),
339 * except here processEvent() and eventHandle() are run in two different
340 * threads (and the AbstractStateChangeInput takes care of processEvent()
344 * The event to process. If you need a specific event type, you
345 * should check for its instance right at the beginning.
347 protected abstract void eventHandle(ITmfEvent event
);