1 /*******************************************************************************
2 * Copyright (c) 2013 Ericsson
3 * All rights reserved. This program and the accompanying materials are
4 * made available under the terms of the Eclipse Public License v1.0 which
5 * accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
9 * Alexandre Montplaisir - Initial API and implementation
10 *******************************************************************************/
12 package org
.eclipse
.linuxtools
.internal
.tmf
.core
.statesystem
.backends
.partial
;
15 import java
.io
.FileInputStream
;
16 import java
.io
.PrintWriter
;
17 import java
.util
.List
;
19 import java
.util
.TreeMap
;
20 import java
.util
.concurrent
.CountDownLatch
;
22 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.statesystem
.backends
.IStateHistoryBackend
;
23 import org
.eclipse
.linuxtools
.tmf
.core
.event
.ITmfEvent
;
24 import org
.eclipse
.linuxtools
.tmf
.core
.exceptions
.AttributeNotFoundException
;
25 import org
.eclipse
.linuxtools
.tmf
.core
.exceptions
.StateSystemDisposedException
;
26 import org
.eclipse
.linuxtools
.tmf
.core
.exceptions
.TimeRangeException
;
27 import org
.eclipse
.linuxtools
.tmf
.core
.interval
.ITmfStateInterval
;
28 import org
.eclipse
.linuxtools
.tmf
.core
.interval
.TmfStateInterval
;
29 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfEventRequest
;
30 import org
.eclipse
.linuxtools
.tmf
.core
.request
.TmfEventRequest
;
31 import org
.eclipse
.linuxtools
.tmf
.core
.statesystem
.AbstractTmfStateProvider
;
32 import org
.eclipse
.linuxtools
.tmf
.core
.statesystem
.ITmfStateProvider
;
33 import org
.eclipse
.linuxtools
.tmf
.core
.statevalue
.ITmfStateValue
;
34 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.ITmfTimestamp
;
35 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.TmfTimeRange
;
36 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.TmfTimestamp
;
37 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfTrace
;
40 * Partial state history back-end.
42 * This is a shim inserted between the real state system and a "real" history
43 * back-end. It will keep checkpoints, every n trace events (where n is called
44 * the granularity) and will only forward to the real state history the state
45 * intervals that crosses at least one checkpoint. Every other interval will
48 * This would mean that it can only answer queries exactly at the checkpoints.
49 * For any other timestamps (ie, most of the time), it will load the closest
50 * earlier checkpoint, and will re-feed the state-change-input with events from
51 * the trace, to restore the real state at the time that was requested.
53 * @author Alexandre Montplaisir
55 public class PartialHistoryBackend
implements IStateHistoryBackend
{
58 * A partial history needs the state input plugin to re-generate state
59 * between checkpoints.
61 private final ITmfStateProvider partialInput
;
64 * Fake state system that is used for partially rebuilding the states (when
65 * going from a checkpoint to a target query timestamp).
67 private final PartialStateSystem partialSS
;
69 /** Reference to the "real" state history that is used for storage */
70 private final IStateHistoryBackend innerHistory
;
72 /** Checkpoints map, <Timestamp, Rank in the trace> */
73 private final TreeMap
<Long
, Long
> checkpoints
= new TreeMap
<>();
75 /** Latch tracking if the initial checkpoint registration is done */
76 private final CountDownLatch checkpointsReady
= new CountDownLatch(1);
78 private final long granularity
;
80 private long latestTime
;
86 * The state change input object that was used to build the
87 * upstream state system. This partial history will make its own
88 * copy (since they have different targets).
90 * The partial history's inner state system. It should already be
91 * assigned to partialInput.
93 * The real state history back-end to use. It's supposed to be
94 * modular, so it should be able to be of any type.
96 * Configuration parameter indicating how many trace events there
97 * should be between each checkpoint
99 public PartialHistoryBackend(ITmfStateProvider partialInput
, PartialStateSystem pss
,
100 IStateHistoryBackend realBackend
, long granularity
) {
101 if (granularity
<= 0 || partialInput
== null || pss
== null ||
102 partialInput
.getAssignedStateSystem() != pss
) {
103 throw new IllegalArgumentException();
106 final long startTime
= realBackend
.getStartTime();
108 this.partialInput
= partialInput
;
109 this.partialSS
= pss
;
111 this.innerHistory
= realBackend
;
112 this.granularity
= granularity
;
114 latestTime
= startTime
;
116 registerCheckpoints();
119 private void registerCheckpoints() {
120 ITmfEventRequest request
= new CheckpointsRequest(partialInput
, checkpoints
);
121 partialInput
.getTrace().sendRequest(request
);
122 /* The request will countDown the checkpoints latch once it's finished */
126 public long getStartTime() {
127 return innerHistory
.getStartTime();
131 public long getEndTime() {
136 public void insertPastState(long stateStartTime
, long stateEndTime
,
137 int quark
, ITmfStateValue value
) throws TimeRangeException
{
138 waitForCheckpoints();
140 /* Update the latest time */
141 if (stateEndTime
> latestTime
) {
142 latestTime
= stateEndTime
;
146 * Check if the interval intersects the previous checkpoint. If so,
147 * insert it in the real history back-end.
149 * FIXME since intervals are inserted in order of rank, we could avoid
150 * doing a map lookup every time here (just compare with the known
153 if (stateStartTime
<= checkpoints
.floorKey(stateEndTime
)) {
154 innerHistory
.insertPastState(stateStartTime
, stateEndTime
, quark
, value
);
159 public void finishedBuilding(long endTime
) throws TimeRangeException
{
160 innerHistory
.finishedBuilding(endTime
);
164 public FileInputStream
supplyAttributeTreeReader() {
165 return innerHistory
.supplyAttributeTreeReader();
169 public File
supplyAttributeTreeWriterFile() {
170 return innerHistory
.supplyAttributeTreeWriterFile();
174 public long supplyAttributeTreeWriterFilePosition() {
175 return innerHistory
.supplyAttributeTreeWriterFilePosition();
179 public void removeFiles() {
180 innerHistory
.removeFiles();
184 public void dispose() {
185 innerHistory
.dispose();
189 public void doQuery(List
<ITmfStateInterval
> currentStateInfo
, long t
)
190 throws TimeRangeException
, StateSystemDisposedException
{
191 /* Wait for required steps to be done */
192 waitForCheckpoints();
193 partialSS
.getUpstreamSS().waitUntilBuilt();
195 if (!checkValidTime(t
)) {
196 throw new TimeRangeException();
199 /* Reload the previous checkpoint */
200 long checkpointTime
= checkpoints
.floorKey(t
);
201 innerHistory
.doQuery(currentStateInfo
, checkpointTime
);
204 * Set the initial contents of the partial state system (which is the
205 * contents of the query at the checkpoint).
207 partialSS
.takeQueryLock();
208 partialSS
.replaceOngoingState(currentStateInfo
);
210 /* Send an event request to update the state system to the target time. */
211 TmfTimeRange range
= new TmfTimeRange(
213 * The state at the checkpoint already includes any state change
214 * caused by the event(s) happening exactly at 'checkpointTime',
215 * if any. We must not include those events in the query.
217 new TmfTimestamp(checkpointTime
+ 1, ITmfTimestamp
.NANOSECOND_SCALE
),
218 new TmfTimestamp(t
, ITmfTimestamp
.NANOSECOND_SCALE
));
219 ITmfEventRequest request
= new PartialStateSystemRequest(partialInput
, range
);
220 partialInput
.getTrace().sendRequest(request
);
223 request
.waitForCompletion();
224 } catch (InterruptedException e
) {
229 * Now the partial state system should have the ongoing time we are
230 * looking for. However, the method expects a List of *state intervals*,
231 * not state values, so we'll create intervals with a dummy end time.
234 for (int i
= 0; i
< currentStateInfo
.size(); i
++) {
236 ITmfStateValue val
= null;
237 start
= partialSS
.getOngoingStartTime(i
);
238 val
= partialSS
.queryOngoingState(i
);
240 ITmfStateInterval interval
= new TmfStateInterval(start
, t
, i
, val
);
241 currentStateInfo
.set(i
, interval
);
243 } catch (AttributeNotFoundException e
) {
244 /* Should not happen, we iterate over existing values. */
248 partialSS
.releaseQueryLock();
252 * Single queries are not supported in partial histories. To get the same
253 * result you can do a full query, then call fullState.get(attribute).
256 public ITmfStateInterval
doSingularQuery(long t
, int attributeQuark
) {
257 throw new UnsupportedOperationException();
261 public boolean checkValidTime(long t
) {
262 return (t
>= getStartTime() && t
<= getEndTime());
266 public void debugPrint(PrintWriter writer
) {
267 // TODO Auto-generated method stub
270 private void waitForCheckpoints() {
272 checkpointsReady
.await();
273 } catch (InterruptedException e
) {
278 // ------------------------------------------------------------------------
279 // Event requests types
280 // ------------------------------------------------------------------------
282 private class CheckpointsRequest
extends TmfEventRequest
{
283 private final ITmfTrace trace
;
284 private final Map
<Long
, Long
> checkpts
;
285 private long eventCount
;
286 private long lastCheckpointAt
;
288 public CheckpointsRequest(ITmfStateProvider input
, Map
<Long
, Long
> checkpoints
) {
289 super(input
.getExpectedEventType(),
290 TmfTimeRange
.ETERNITY
,
292 ITmfEventRequest
.ALL_DATA
,
293 ITmfEventRequest
.ExecutionType
.BACKGROUND
);
295 this.trace
= input
.getTrace();
296 this.checkpts
= checkpoints
;
298 lastCheckpointAt
= 0;
300 /* Insert a checkpoint at the start of the trace */
301 checkpoints
.put(input
.getStartTime(), 0L);
305 public void handleData(final ITmfEvent event
) {
306 super.handleData(event
);
307 if (event
!= null && event
.getTrace() == trace
) {
310 /* Check if we need to register a new checkpoint */
311 if (eventCount
>= lastCheckpointAt
+ granularity
) {
312 checkpts
.put(event
.getTimestamp().getValue(), eventCount
);
313 lastCheckpointAt
= eventCount
;
319 public void handleCompleted() {
320 super.handleCompleted();
321 checkpointsReady
.countDown();
325 private class PartialStateSystemRequest
extends TmfEventRequest
{
326 private final ITmfStateProvider sci
;
327 private final ITmfTrace trace
;
329 PartialStateSystemRequest(ITmfStateProvider sci
, TmfTimeRange range
) {
330 super(sci
.getExpectedEventType(),
333 ITmfEventRequest
.ALL_DATA
,
334 ITmfEventRequest
.ExecutionType
.BACKGROUND
);
336 this.trace
= sci
.getTrace();
340 public void handleData(final ITmfEvent event
) {
341 super.handleData(event
);
342 if (event
!= null && event
.getTrace() == trace
) {
343 sci
.processEvent(event
);
348 public void handleCompleted() {
350 * If we're using a threaded state provider, we need to make sure
351 * all events have been handled by the state system before doing
354 if (partialInput
instanceof AbstractTmfStateProvider
) {
355 ((AbstractTmfStateProvider
) partialInput
).waitForEmptyQueue();
357 super.handleCompleted();