Merge branch 'master' into lttng-luna
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / internal / tmf / core / statesystem / backends / partial / PartialHistoryBackend.java
1 /*******************************************************************************
2 * Copyright (c) 2013 Ericsson
3 * All rights reserved. This program and the accompanying materials are
4 * made available under the terms of the Eclipse Public License v1.0 which
5 * accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * Alexandre Montplaisir - Initial API and implementation
10 *******************************************************************************/
11
12 package org.eclipse.linuxtools.internal.tmf.core.statesystem.backends.partial;
13
14 import java.io.File;
15 import java.io.FileInputStream;
16 import java.io.PrintWriter;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.TreeMap;
20 import java.util.concurrent.CountDownLatch;
21
22 import org.eclipse.linuxtools.internal.tmf.core.statesystem.backends.IStateHistoryBackend;
23 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
24 import org.eclipse.linuxtools.tmf.core.exceptions.AttributeNotFoundException;
25 import org.eclipse.linuxtools.tmf.core.exceptions.StateSystemDisposedException;
26 import org.eclipse.linuxtools.tmf.core.exceptions.TimeRangeException;
27 import org.eclipse.linuxtools.tmf.core.interval.ITmfStateInterval;
28 import org.eclipse.linuxtools.tmf.core.interval.TmfStateInterval;
29 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
31 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
33 import org.eclipse.linuxtools.tmf.core.statesystem.AbstractTmfStateProvider;
34 import org.eclipse.linuxtools.tmf.core.statesystem.ITmfStateProvider;
35 import org.eclipse.linuxtools.tmf.core.statevalue.ITmfStateValue;
36 import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp;
37 import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimeRange;
38 import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimestamp;
39 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
40
41 /**
42 * Partial state history back-end.
43 *
44 * This is a shim inserted between the real state system and a "real" history
45 * back-end. It will keep checkpoints, every n trace events (where n is called
46 * the granularity) and will only forward to the real state history the state
47 * intervals that crosses at least one checkpoint. Every other interval will
48 * be discarded.
49 *
50 * This would mean that it can only answer queries exactly at the checkpoints.
51 * For any other timestamps (ie, most of the time), it will load the closest
52 * earlier checkpoint, and will re-feed the state-change-input with events from
53 * the trace, to restore the real state at the time that was requested.
54 *
55 * @author Alexandre Montplaisir
56 */
57 public class PartialHistoryBackend implements IStateHistoryBackend {
58
59 /**
60 * A partial history needs the state input plugin to re-generate state
61 * between checkpoints.
62 */
63 private final ITmfStateProvider partialInput;
64
65 /**
66 * Fake state system that is used for partially rebuilding the states (when
67 * going from a checkpoint to a target query timestamp).
68 */
69 private final PartialStateSystem partialSS;
70
71 /** Reference to the "real" state history that is used for storage */
72 private final IStateHistoryBackend innerHistory;
73
74 /** Checkpoints map, <Timestamp, Rank in the trace> */
75 private final TreeMap<Long, Long> checkpoints =
76 new TreeMap<Long, Long>();
77
78 /** Latch tracking if the initial checkpoint registration is done */
79 private final CountDownLatch checkpointsReady = new CountDownLatch(1);
80
81 private final long granularity;
82
83 private long latestTime;
84
85 /**
86 * Constructor
87 *
88 * @param partialInput
89 * The state change input object that was used to build the
90 * upstream state system. This partial history will make its own
91 * copy (since they have different targets).
92 * @param pss
93 * The partial history's inner state system. It should already be
94 * assigned to partialInput.
95 * @param realBackend
96 * The real state history back-end to use. It's supposed to be
97 * modular, so it should be able to be of any type.
98 * @param granularity
99 * Configuration parameter indicating how many trace events there
100 * should be between each checkpoint
101 */
102 public PartialHistoryBackend(ITmfStateProvider partialInput, PartialStateSystem pss,
103 IStateHistoryBackend realBackend, long granularity) {
104 if (granularity <= 0 || partialInput == null || pss == null ||
105 partialInput.getAssignedStateSystem() != pss) {
106 throw new IllegalArgumentException();
107 }
108
109 final long startTime = realBackend.getStartTime();
110
111 this.partialInput = partialInput;
112 this.partialSS = pss;
113
114 this.innerHistory = realBackend;
115 this.granularity = granularity;
116
117 latestTime = startTime;
118
119 registerCheckpoints();
120 }
121
122 private void registerCheckpoints() {
123 ITmfEventRequest request = new CheckpointsRequest(partialInput, checkpoints);
124 partialInput.getTrace().sendRequest(request);
125 /* The request will countDown the checkpoints latch once it's finished */
126 }
127
128 @Override
129 public long getStartTime() {
130 return innerHistory.getStartTime();
131 }
132
133 @Override
134 public long getEndTime() {
135 return latestTime;
136 }
137
138 @Override
139 public void insertPastState(long stateStartTime, long stateEndTime,
140 int quark, ITmfStateValue value) throws TimeRangeException {
141 waitForCheckpoints();
142
143 /* Update the latest time */
144 if (stateEndTime > latestTime) {
145 latestTime = stateEndTime;
146 }
147
148 /*
149 * Check if the interval intersects the previous checkpoint. If so,
150 * insert it in the real history back-end.
151 *
152 * FIXME since intervals are inserted in order of rank, we could avoid
153 * doing a map lookup every time here (just compare with the known
154 * previous one).
155 */
156 if (stateStartTime <= checkpoints.floorKey(stateEndTime)) {
157 innerHistory.insertPastState(stateStartTime, stateEndTime, quark, value);
158 }
159 }
160
161 @Override
162 public void finishedBuilding(long endTime) throws TimeRangeException {
163 innerHistory.finishedBuilding(endTime);
164 }
165
166 @Override
167 public FileInputStream supplyAttributeTreeReader() {
168 return innerHistory.supplyAttributeTreeReader();
169 }
170
171 @Override
172 public File supplyAttributeTreeWriterFile() {
173 return innerHistory.supplyAttributeTreeWriterFile();
174 }
175
176 @Override
177 public long supplyAttributeTreeWriterFilePosition() {
178 return innerHistory.supplyAttributeTreeWriterFilePosition();
179 }
180
181 @Override
182 public void removeFiles() {
183 innerHistory.removeFiles();
184 }
185
186 @Override
187 public void dispose() {
188 innerHistory.dispose();
189 }
190
191 @Override
192 public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
193 throws TimeRangeException, StateSystemDisposedException {
194 /* Wait for required steps to be done */
195 waitForCheckpoints();
196 partialSS.getUpstreamSS().waitUntilBuilt();
197
198 if (!checkValidTime(t)) {
199 throw new TimeRangeException();
200 }
201
202 /* Reload the previous checkpoint */
203 long checkpointTime = checkpoints.floorKey(t);
204 innerHistory.doQuery(currentStateInfo, checkpointTime);
205
206 /*
207 * Set the initial contents of the partial state system (which is the
208 * contents of the query at the checkpoint).
209 */
210 partialSS.takeQueryLock();
211 partialSS.replaceOngoingState(currentStateInfo);
212
213 /* Send an event request to update the state system to the target time. */
214 TmfTimeRange range = new TmfTimeRange(
215 /*
216 * The state at the checkpoint already includes any state change
217 * caused by the event(s) happening exactly at 'checkpointTime',
218 * if any. We must not include those events in the query.
219 */
220 new TmfTimestamp(checkpointTime + 1, ITmfTimestamp.NANOSECOND_SCALE),
221 new TmfTimestamp(t, ITmfTimestamp.NANOSECOND_SCALE));
222 ITmfEventRequest request = new PartialStateSystemRequest(partialInput, range);
223 partialInput.getTrace().sendRequest(request);
224
225 try {
226 request.waitForCompletion();
227 } catch (InterruptedException e) {
228 e.printStackTrace();
229 }
230
231 /*
232 * Now the partial state system should have the ongoing time we are
233 * looking for. However, the method expects a List of *state intervals*,
234 * not state values, so we'll create intervals with a dummy end time.
235 */
236 try {
237 for (int i = 0; i < currentStateInfo.size(); i++) {
238 long start = 0;
239 ITmfStateValue val = null;
240 start = partialSS.getOngoingStartTime(i);
241 val = partialSS.queryOngoingState(i);
242
243 ITmfStateInterval interval = new TmfStateInterval(start, t, i, val);
244 currentStateInfo.set(i, interval);
245 }
246 } catch (AttributeNotFoundException e) {
247 /* Should not happen, we iterate over existing values. */
248 e.printStackTrace();
249 }
250
251 partialSS.releaseQueryLock();
252 }
253
254 /**
255 * Single queries are not supported in partial histories. To get the same
256 * result you can do a full query, then call fullState.get(attribute).
257 */
258 @Override
259 public ITmfStateInterval doSingularQuery(long t, int attributeQuark) {
260 throw new UnsupportedOperationException();
261 }
262
263 @Override
264 public boolean checkValidTime(long t) {
265 return (t >= getStartTime() && t <= getEndTime());
266 }
267
268 @Override
269 public void debugPrint(PrintWriter writer) {
270 // TODO Auto-generated method stub
271 }
272
273 private void waitForCheckpoints() {
274 try {
275 checkpointsReady.await();
276 } catch (InterruptedException e) {
277 e.printStackTrace();
278 }
279 }
280
281 // ------------------------------------------------------------------------
282 // Event requests types
283 // ------------------------------------------------------------------------
284
285 private class CheckpointsRequest extends TmfEventRequest {
286 private final ITmfTrace trace;
287 private final Map<Long, Long> checkpts;
288 private long eventCount;
289 private long lastCheckpointAt;
290
291 public CheckpointsRequest(ITmfStateProvider input, Map<Long, Long> checkpoints) {
292 super(input.getExpectedEventType(),
293 TmfTimeRange.ETERNITY,
294 0,
295 TmfDataRequest.ALL_DATA,
296 ITmfDataRequest.ExecutionType.BACKGROUND);
297 checkpoints.clear();
298 this.trace = input.getTrace();
299 this.checkpts = checkpoints;
300 eventCount = 0;
301 lastCheckpointAt = 0;
302
303 /* Insert a checkpoint at the start of the trace */
304 checkpoints.put(input.getStartTime(), 0L);
305 }
306
307 @Override
308 public void handleData(final ITmfEvent event) {
309 super.handleData(event);
310 if (event != null && event.getTrace() == trace) {
311 eventCount++;
312
313 /* Check if we need to register a new checkpoint */
314 if (eventCount >= lastCheckpointAt + granularity) {
315 checkpts.put(event.getTimestamp().getValue(), eventCount);
316 lastCheckpointAt = eventCount;
317 }
318 }
319 }
320
321 @Override
322 public void handleCompleted() {
323 super.handleCompleted();
324 checkpointsReady.countDown();
325 }
326 }
327
328 private class PartialStateSystemRequest extends TmfEventRequest {
329 private final ITmfStateProvider sci;
330 private final ITmfTrace trace;
331
332 PartialStateSystemRequest(ITmfStateProvider sci, TmfTimeRange range) {
333 super(sci.getExpectedEventType(),
334 range,
335 0,
336 TmfDataRequest.ALL_DATA,
337 ITmfDataRequest.ExecutionType.BACKGROUND);
338 this.sci = sci;
339 this.trace = sci.getTrace();
340 }
341
342 @Override
343 public void handleData(final ITmfEvent event) {
344 super.handleData(event);
345 if (event != null && event.getTrace() == trace) {
346 sci.processEvent(event);
347 }
348 }
349
350 @Override
351 public void handleCompleted() {
352 /*
353 * If we're using a threaded state provider, we need to make sure
354 * all events have been handled by the state system before doing
355 * queries on it.
356 */
357 if (partialInput instanceof AbstractTmfStateProvider) {
358 ((AbstractTmfStateProvider) partialInput).waitForEmptyQueue();
359 }
360 super.handleCompleted();
361 }
362
363
364 }
365 }
This page took 0.05024 seconds and 6 git commands to generate.