tmf: remove deprecated methods from tmf
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / statesystem / AbstractTmfStateProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2012, 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Alexandre Montplaisir - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.tracecompass.tmf.core.statesystem;
14
15 import org.eclipse.core.runtime.ISafeRunnable;
16 import org.eclipse.core.runtime.SafeRunner;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
19 import org.eclipse.tracecompass.internal.tmf.core.Activator;
20 import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem;
21 import org.eclipse.tracecompass.statesystem.core.ITmfStateSystemBuilder;
22 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
23 import org.eclipse.tracecompass.tmf.core.event.TmfEvent;
24 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
25 import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
26
27 import com.google.common.annotations.VisibleForTesting;
28
29 /**
30 * Instead of using IStateChangeInput directly, one can extend this class, which
31 * defines a lot of the common functions of the state change input plugin.
32 *
33 * It will handle the state-system-processing in a separate thread, which is
34 * normally not a bad idea for traces of some size.
35 *
36 * processEvent() is replaced with eventHandle(), so that all the multi-thread
37 * logic is abstracted away.
38 *
39 * @author Alexandre Montplaisir
40 */
41 public abstract class AbstractTmfStateProvider implements ITmfStateProvider {
42
43 private static final int DEFAULT_EVENTS_QUEUE_SIZE = 127;
44 private static final int DEFAULT_EVENTS_CHUNK_SIZE = 127;
45
46 private final ITmfTrace fTrace;
47 private final BufferedBlockingQueue<ITmfEvent> fEventsQueue;
48 private final Thread fEventHandlerThread;
49
50 private boolean fStateSystemAssigned;
51 /** State system in which to insert the state changes */
52 private @Nullable ITmfStateSystemBuilder fSS = null;
53 private @Nullable Throwable fFailureCause = null;
54
55 /* The last safe time at which this state provider can be queried */
56 private volatile long fSafeTime;
57
58 /*
59 * An exception propagation runnable. If an exception occurred in Event
60 * Processor thread, this field should be updated so that the "main" thread
61 * will propagate the exception
62 */
63 private Runnable fPropagateExceptions = () -> {
64 // Do nothing, a new Runnable will be defined if exceptions occur in the
65 // threads
66 };
67
68
69 /**
70 * Instantiate a new state provider plugin.
71 *
72 * @param trace
73 * The LTTng 2.0 kernel trace directory
74 * @param id
75 * Name given to this state change input. Only used internally.
76 */
77 public AbstractTmfStateProvider(ITmfTrace trace, String id) {
78 this(trace, id, DEFAULT_EVENTS_QUEUE_SIZE, DEFAULT_EVENTS_CHUNK_SIZE);
79 }
80
81 /**
82 * Instantiate a new state provider. This constructor allows to fine-tune
83 * the size of the event processing queue. This can be useful to unit tests
84 * various situations.
85 *
86 * @param trace
87 * The trace
88 * @param id
89 * Name given to this state change input. Only used internally.
90 * @param queueSize
91 * The size of the queue, a.k.a the number of chunks that fit
92 * into the buffered queue.
93 * @param chunkSize
94 * The number of events that fit inside a single chunk of the
95 * queue
96 * @since 2.3
97 */
98 @VisibleForTesting
99 protected AbstractTmfStateProvider(ITmfTrace trace, String id, int queueSize, int chunkSize) {
100 if (queueSize <= 0 || chunkSize <= 0) {
101 throw new IllegalArgumentException("Cannot have negative sized buffer" + //$NON-NLS-1$
102 formatError("queueSize", queueSize) + //$NON-NLS-1$
103 formatError("chunkSize", chunkSize)); //$NON-NLS-1$
104 }
105 fTrace = trace;
106 fEventsQueue = new BufferedBlockingQueue<>(queueSize, chunkSize);
107 fStateSystemAssigned = false;
108 // set the safe time to before the trace start, the analysis has not yet
109 // started
110 fSafeTime = trace.getStartTime().toNanos() - 1;
111 fEventHandlerThread = new Thread(() -> SafeRunner.run(new EventProcessor()), id + " Event Handler"); //$NON-NLS-1$
112 }
113
114 private static String formatError(String name, int value) {
115 return (value <= 0) ? " " + name + " = " + value : ""; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
116 }
117
118 /**
119 * Get the state system builder of this provider (to insert states in).
120 *
121 * @return The state system object to be filled
122 */
123 protected @Nullable ITmfStateSystemBuilder getStateSystemBuilder() {
124 return fSS;
125 }
126
127 @Override
128 public ITmfTrace getTrace() {
129 return fTrace;
130 }
131
132 @Override
133 public long getStartTime() {
134 return fTrace.getStartTime().toNanos();
135 }
136
137 /**
138 * @since 2.0
139 */
140 @Override
141 public long getLatestSafeTime() {
142 return fSafeTime;
143 }
144
145 @Override
146 public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) {
147 fSS = ssb;
148 fStateSystemAssigned = true;
149 fEventHandlerThread.start();
150 }
151
152 @Override
153 public @Nullable ITmfStateSystem getAssignedStateSystem() {
154 return fSS;
155 }
156
157 @Override
158 public void dispose() {
159 /*
160 * Insert a null event in the queue to stop the event handler's thread.
161 */
162 try {
163 fEventsQueue.put(END_EVENT);
164 fEventsQueue.flushInputBuffer();
165 fEventHandlerThread.join();
166 } catch (InterruptedException e) {
167 Activator.logError("Error disposing state provider", e); //$NON-NLS-1$
168 }
169 fStateSystemAssigned = false;
170 fSS = null;
171 }
172
173 @Override
174 public void processEvent(ITmfEvent event) {
175 /* Make sure the target state system has been assigned */
176 if (!fStateSystemAssigned) {
177 throw new IllegalStateException("Cannot process event without a target state system. ID: " + getClass().getSimpleName()); //$NON-NLS-1$
178 }
179 fPropagateExceptions.run();
180
181 /* Insert the event we're received into the events queue */
182 ITmfEvent curEvent = event;
183 fEventsQueue.put(curEvent);
184 }
185
186 /**
187 * @since 3.0
188 */
189 @Override
190 public void fail(Throwable cause) {
191 fFailureCause = cause;
192 }
193
194 /**
195 * @since 3.0
196 */
197 @Override
198 public @Nullable Throwable getFailureCause() {
199 return fFailureCause;
200 }
201
202 /**
203 * Block the caller until the events queue is empty.
204 */
205 public void waitForEmptyQueue() {
206 /*
207 * We will first insert a dummy event that is guaranteed to not modify
208 * the state. That way, when that event leaves the queue, we will know
209 * for sure that the state system processed the preceding real event.
210 */
211 try {
212 fEventsQueue.put(EMPTY_QUEUE_EVENT);
213 fEventsQueue.flushInputBuffer();
214 while (!fEventsQueue.isEmpty()) {
215 Thread.sleep(100);
216 }
217 } catch (InterruptedException e) {
218 e.printStackTrace();
219 }
220 }
221
222 // ------------------------------------------------------------------------
223 // Special event types
224 // ------------------------------------------------------------------------
225
226 /**
227 * Fake event indicating the build is over, and the provider should close
228 */
229 private static class EndEvent extends TmfEvent {
230 public EndEvent() {
231 super(null, ITmfContext.UNKNOWN_RANK, null, null, null);
232 }
233 }
234
235 /** Fake event indicating we want to clear the current queue */
236 private static class EmptyQueueEvent extends TmfEvent {
237 public EmptyQueueEvent() {
238 super(null, ITmfContext.UNKNOWN_RANK, null, null, null);
239 }
240 }
241
242 private static final EndEvent END_EVENT = new EndEvent();
243 private static final EmptyQueueEvent EMPTY_QUEUE_EVENT = new EmptyQueueEvent();
244
245 // ------------------------------------------------------------------------
246 // Inner classes
247 // ------------------------------------------------------------------------
248
249 /**
250 * This is the runner class for the second thread, which will take the
251 * events from the queue and pass them through the state system.
252 */
253 private class EventProcessor implements ISafeRunnable {
254
255 private @Nullable ITmfEvent currentEvent;
256 private boolean fDone = false;
257
258 @Override
259 public void run() {
260 if (!fStateSystemAssigned) {
261 Activator.logError("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$
262 return;
263 }
264
265 /*
266 * We never insert null in the queue. Cannot be checked at
267 * compile-time until Java 8 annotations...
268 */
269 ITmfEvent event = fEventsQueue.take();
270 /* This is a singleton, we want to do != instead of !x.equals */
271 while (event != END_EVENT) {
272 if (event == EMPTY_QUEUE_EVENT) {
273 /* Synchronization event, should be ignored */
274 event = fEventsQueue.take();
275 continue;
276 }
277 currentEvent = event;
278 fSafeTime = event.getTimestamp().toNanos() - 1;
279 eventHandle(event);
280 event = fEventsQueue.take();
281 }
282 fDone = true;
283 /* We've received the last event, clean up */
284 done();
285 closeStateSystem();
286 }
287
288 private void closeStateSystem() {
289 ITmfEvent event = currentEvent;
290 final long endTime = (event == null) ? 0 : event.getTimestamp().toNanos();
291
292 if (fSS != null) {
293 fSS.closeHistory(endTime);
294 }
295 }
296
297 @Override
298 public void handleException(@Nullable Throwable exception) {
299 // Update the propagation runnable
300 final RuntimeException rException = (exception instanceof RuntimeException) ? (RuntimeException) exception : new RuntimeException("Error in threaded state history backend", exception); //$NON-NLS-1$
301 fail(rException);
302 fPropagateExceptions = () -> {
303 // This exception should be caught by the thread that does the
304 // insertions and trigger the cancellation mechanism
305 throw rException;
306 };
307 if (fDone) {
308 /*
309 * The last event was already processed, the exception was
310 * thrown from the closing of the state system, just return
311 */
312 return;
313 }
314 /* drain */
315 ITmfEvent event = fEventsQueue.take();
316 while (event != END_EVENT) {
317 if (event == EMPTY_QUEUE_EVENT) {
318 /* Synchronization event, should be ignored */
319 event = fEventsQueue.take();
320 continue;
321 }
322 event = fEventsQueue.take();
323 }
324
325 /* We've received the last event, clean up */
326 closeStateSystem();
327 }
328 }
329
330 // ------------------------------------------------------------------------
331 // Abstract methods
332 // ------------------------------------------------------------------------
333
334 /**
335 * Handle the given event and send the appropriate state transitions into
336 * the the state system.
337 *
338 * This is basically the same thing as IStateChangeInput.processEvent(),
339 * except here processEvent() and eventHandle() are run in two different
340 * threads (and the AbstractStateChangeInput takes care of processEvent()
341 * already).
342 *
343 * @param event
344 * The event to process. If you need a specific event type, you
345 * should check for its instance right at the beginning.
346 */
347 protected abstract void eventHandle(ITmfEvent event);
348
349 }
This page took 0.03905 seconds and 5 git commands to generate.