tmf: Split the state system in a separate plugin
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / internal / tmf / core / statesystem / backends / partial / PartialHistoryBackend.java
1 /*******************************************************************************
2 * Copyright (c) 2013, 2014 Ericsson
3 * All rights reserved. This program and the accompanying materials are
4 * made available under the terms of the Eclipse Public License v1.0 which
5 * accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * Alexandre Montplaisir - Initial API and implementation
10 *******************************************************************************/
11
12 package org.eclipse.linuxtools.internal.tmf.core.statesystem.backends.partial;
13
14 import java.io.File;
15 import java.io.FileInputStream;
16 import java.io.PrintWriter;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.TreeMap;
20 import java.util.concurrent.CountDownLatch;
21
22 import org.eclipse.linuxtools.statesystem.core.ITmfStateSystem;
23 import org.eclipse.linuxtools.statesystem.core.backend.IStateHistoryBackend;
24 import org.eclipse.linuxtools.statesystem.core.exceptions.AttributeNotFoundException;
25 import org.eclipse.linuxtools.statesystem.core.exceptions.StateSystemDisposedException;
26 import org.eclipse.linuxtools.statesystem.core.exceptions.TimeRangeException;
27 import org.eclipse.linuxtools.statesystem.core.interval.ITmfStateInterval;
28 import org.eclipse.linuxtools.statesystem.core.interval.TmfStateInterval;
29 import org.eclipse.linuxtools.statesystem.core.statevalue.ITmfStateValue;
30 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
33 import org.eclipse.linuxtools.tmf.core.statesystem.AbstractTmfStateProvider;
34 import org.eclipse.linuxtools.tmf.core.statesystem.ITmfStateProvider;
35 import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp;
36 import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimeRange;
37 import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimestamp;
38 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
39
40 /**
41 * Partial state history back-end.
42 *
43 * This is a shim inserted between the real state system and a "real" history
44 * back-end. It will keep checkpoints, every n trace events (where n is called
45 * the granularity) and will only forward to the real state history the state
46 * intervals that crosses at least one checkpoint. Every other interval will
47 * be discarded.
48 *
49 * This would mean that it can only answer queries exactly at the checkpoints.
50 * For any other timestamps (ie, most of the time), it will load the closest
51 * earlier checkpoint, and will re-feed the state-change-input with events from
52 * the trace, to restore the real state at the time that was requested.
53 *
54 * @author Alexandre Montplaisir
55 */
56 public class PartialHistoryBackend implements IStateHistoryBackend {
57
58 /**
59 * A partial history needs the state input plugin to re-generate state
60 * between checkpoints.
61 */
62 private final ITmfStateProvider partialInput;
63
64 /**
65 * Fake state system that is used for partially rebuilding the states (when
66 * going from a checkpoint to a target query timestamp).
67 */
68 private final PartialStateSystem partialSS;
69
70 /** Reference to the "real" state history that is used for storage */
71 private final IStateHistoryBackend innerHistory;
72
73 /** Checkpoints map, <Timestamp, Rank in the trace> */
74 private final TreeMap<Long, Long> checkpoints = new TreeMap<>();
75
76 /** Latch tracking if the initial checkpoint registration is done */
77 private final CountDownLatch checkpointsReady = new CountDownLatch(1);
78
79 private final long granularity;
80
81 private long latestTime;
82
83 /**
84 * Constructor
85 *
86 * @param partialInput
87 * The state change input object that was used to build the
88 * upstream state system. This partial history will make its own
89 * copy (since they have different targets).
90 * @param pss
91 * The partial history's inner state system. It should already be
92 * assigned to partialInput.
93 * @param realBackend
94 * The real state history back-end to use. It's supposed to be
95 * modular, so it should be able to be of any type.
96 * @param granularity
97 * Configuration parameter indicating how many trace events there
98 * should be between each checkpoint
99 */
100 public PartialHistoryBackend(ITmfStateProvider partialInput, PartialStateSystem pss,
101 IStateHistoryBackend realBackend, long granularity) {
102 if (granularity <= 0 || partialInput == null || pss == null ||
103 partialInput.getAssignedStateSystem() != pss) {
104 throw new IllegalArgumentException();
105 }
106
107 final long startTime = realBackend.getStartTime();
108
109 this.partialInput = partialInput;
110 this.partialSS = pss;
111
112 this.innerHistory = realBackend;
113 this.granularity = granularity;
114
115 latestTime = startTime;
116
117 registerCheckpoints();
118 }
119
120 private void registerCheckpoints() {
121 ITmfEventRequest request = new CheckpointsRequest(partialInput, checkpoints);
122 partialInput.getTrace().sendRequest(request);
123 /* The request will countDown the checkpoints latch once it's finished */
124 }
125
126 @Override
127 public long getStartTime() {
128 return innerHistory.getStartTime();
129 }
130
131 @Override
132 public long getEndTime() {
133 return latestTime;
134 }
135
136 @Override
137 public void insertPastState(long stateStartTime, long stateEndTime,
138 int quark, ITmfStateValue value) throws TimeRangeException {
139 waitForCheckpoints();
140
141 /* Update the latest time */
142 if (stateEndTime > latestTime) {
143 latestTime = stateEndTime;
144 }
145
146 /*
147 * Check if the interval intersects the previous checkpoint. If so,
148 * insert it in the real history back-end.
149 *
150 * FIXME since intervals are inserted in order of rank, we could avoid
151 * doing a map lookup every time here (just compare with the known
152 * previous one).
153 */
154 if (stateStartTime <= checkpoints.floorKey(stateEndTime)) {
155 innerHistory.insertPastState(stateStartTime, stateEndTime, quark, value);
156 }
157 }
158
159 @Override
160 public void finishedBuilding(long endTime) throws TimeRangeException {
161 innerHistory.finishedBuilding(endTime);
162 }
163
164 @Override
165 public FileInputStream supplyAttributeTreeReader() {
166 return innerHistory.supplyAttributeTreeReader();
167 }
168
169 @Override
170 public File supplyAttributeTreeWriterFile() {
171 return innerHistory.supplyAttributeTreeWriterFile();
172 }
173
174 @Override
175 public long supplyAttributeTreeWriterFilePosition() {
176 return innerHistory.supplyAttributeTreeWriterFilePosition();
177 }
178
179 @Override
180 public void removeFiles() {
181 innerHistory.removeFiles();
182 }
183
184 @Override
185 public void dispose() {
186 innerHistory.dispose();
187 }
188
189 @Override
190 public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
191 throws TimeRangeException, StateSystemDisposedException {
192 /* Wait for required steps to be done */
193 waitForCheckpoints();
194 partialSS.getUpstreamSS().waitUntilBuilt();
195
196 if (!checkValidTime(t)) {
197 throw new TimeRangeException();
198 }
199
200 /* Reload the previous checkpoint */
201 long checkpointTime = checkpoints.floorKey(t);
202 innerHistory.doQuery(currentStateInfo, checkpointTime);
203
204 /*
205 * Set the initial contents of the partial state system (which is the
206 * contents of the query at the checkpoint).
207 */
208 partialSS.takeQueryLock();
209 partialSS.replaceOngoingState(currentStateInfo);
210
211 /* Send an event request to update the state system to the target time. */
212 TmfTimeRange range = new TmfTimeRange(
213 /*
214 * The state at the checkpoint already includes any state change
215 * caused by the event(s) happening exactly at 'checkpointTime',
216 * if any. We must not include those events in the query.
217 */
218 new TmfTimestamp(checkpointTime + 1, ITmfTimestamp.NANOSECOND_SCALE),
219 new TmfTimestamp(t, ITmfTimestamp.NANOSECOND_SCALE));
220 ITmfEventRequest request = new PartialStateSystemRequest(partialInput, range);
221 partialInput.getTrace().sendRequest(request);
222
223 try {
224 request.waitForCompletion();
225 } catch (InterruptedException e) {
226 e.printStackTrace();
227 }
228
229 /*
230 * Now the partial state system should have the ongoing time we are
231 * looking for. However, the method expects a List of *state intervals*,
232 * not state values, so we'll create intervals with a dummy end time.
233 */
234 try {
235 for (int i = 0; i < currentStateInfo.size(); i++) {
236 long start = 0;
237 ITmfStateValue val = null;
238 start = ((ITmfStateSystem) partialSS).getOngoingStartTime(i);
239 val = ((ITmfStateSystem) partialSS).queryOngoingState(i);
240
241 ITmfStateInterval interval = new TmfStateInterval(start, t, i, val);
242 currentStateInfo.set(i, interval);
243 }
244 } catch (AttributeNotFoundException e) {
245 /* Should not happen, we iterate over existing values. */
246 e.printStackTrace();
247 }
248
249 partialSS.releaseQueryLock();
250 }
251
252 /**
253 * Single queries are not supported in partial histories. To get the same
254 * result you can do a full query, then call fullState.get(attribute).
255 */
256 @Override
257 public ITmfStateInterval doSingularQuery(long t, int attributeQuark) {
258 throw new UnsupportedOperationException();
259 }
260
261 @Override
262 public boolean checkValidTime(long t) {
263 return (t >= getStartTime() && t <= getEndTime());
264 }
265
266 @Override
267 public void debugPrint(PrintWriter writer) {
268 // TODO Auto-generated method stub
269 }
270
271 private void waitForCheckpoints() {
272 try {
273 checkpointsReady.await();
274 } catch (InterruptedException e) {
275 e.printStackTrace();
276 }
277 }
278
279 // ------------------------------------------------------------------------
280 // Event requests types
281 // ------------------------------------------------------------------------
282
283 private class CheckpointsRequest extends TmfEventRequest {
284 private final ITmfTrace trace;
285 private final Map<Long, Long> checkpts;
286 private long eventCount;
287 private long lastCheckpointAt;
288
289 public CheckpointsRequest(ITmfStateProvider input, Map<Long, Long> checkpoints) {
290 super(input.getExpectedEventType(),
291 TmfTimeRange.ETERNITY,
292 0,
293 ITmfEventRequest.ALL_DATA,
294 ITmfEventRequest.ExecutionType.FOREGROUND);
295 checkpoints.clear();
296 this.trace = input.getTrace();
297 this.checkpts = checkpoints;
298 eventCount = 0;
299 lastCheckpointAt = 0;
300
301 /* Insert a checkpoint at the start of the trace */
302 checkpoints.put(input.getStartTime(), 0L);
303 }
304
305 @Override
306 public void handleData(final ITmfEvent event) {
307 super.handleData(event);
308 if (event != null && event.getTrace() == trace) {
309 eventCount++;
310
311 /* Check if we need to register a new checkpoint */
312 if (eventCount >= lastCheckpointAt + granularity) {
313 checkpts.put(event.getTimestamp().getValue(), eventCount);
314 lastCheckpointAt = eventCount;
315 }
316 }
317 }
318
319 @Override
320 public void handleCompleted() {
321 super.handleCompleted();
322 checkpointsReady.countDown();
323 }
324 }
325
326 private class PartialStateSystemRequest extends TmfEventRequest {
327 private final ITmfStateProvider sci;
328 private final ITmfTrace trace;
329
330 PartialStateSystemRequest(ITmfStateProvider sci, TmfTimeRange range) {
331 super(sci.getExpectedEventType(),
332 range,
333 0,
334 ITmfEventRequest.ALL_DATA,
335 ITmfEventRequest.ExecutionType.BACKGROUND);
336 this.sci = sci;
337 this.trace = sci.getTrace();
338 }
339
340 @Override
341 public void handleData(final ITmfEvent event) {
342 super.handleData(event);
343 if (event != null && event.getTrace() == trace) {
344 sci.processEvent(event);
345 }
346 }
347
348 @Override
349 public void handleCompleted() {
350 /*
351 * If we're using a threaded state provider, we need to make sure
352 * all events have been handled by the state system before doing
353 * queries on it.
354 */
355 if (partialInput instanceof AbstractTmfStateProvider) {
356 ((AbstractTmfStateProvider) partialInput).waitForEmptyQueue();
357 }
358 super.handleCompleted();
359 }
360
361 }
362 }
This page took 0.045346 seconds and 6 git commands to generate.