1 /*******************************************************************************
2 * Copyright (c) 2013, 2014 Ericsson
3 * All rights reserved. This program and the accompanying materials are
4 * made available under the terms of the Eclipse Public License v1.0 which
5 * accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
9 * Alexandre Montplaisir - Initial API and implementation
10 *******************************************************************************/
12 package org
.eclipse
.linuxtools
.internal
.tmf
.core
.statesystem
.backends
.partial
;
15 import java
.io
.FileInputStream
;
16 import java
.io
.PrintWriter
;
17 import java
.util
.List
;
19 import java
.util
.TreeMap
;
20 import java
.util
.concurrent
.CountDownLatch
;
22 import org
.eclipse
.linuxtools
.statesystem
.core
.ITmfStateSystem
;
23 import org
.eclipse
.linuxtools
.statesystem
.core
.backend
.IStateHistoryBackend
;
24 import org
.eclipse
.linuxtools
.statesystem
.core
.exceptions
.AttributeNotFoundException
;
25 import org
.eclipse
.linuxtools
.statesystem
.core
.exceptions
.StateSystemDisposedException
;
26 import org
.eclipse
.linuxtools
.statesystem
.core
.exceptions
.TimeRangeException
;
27 import org
.eclipse
.linuxtools
.statesystem
.core
.interval
.ITmfStateInterval
;
28 import org
.eclipse
.linuxtools
.statesystem
.core
.interval
.TmfStateInterval
;
29 import org
.eclipse
.linuxtools
.statesystem
.core
.statevalue
.ITmfStateValue
;
30 import org
.eclipse
.linuxtools
.tmf
.core
.event
.ITmfEvent
;
31 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfEventRequest
;
32 import org
.eclipse
.linuxtools
.tmf
.core
.request
.TmfEventRequest
;
33 import org
.eclipse
.linuxtools
.tmf
.core
.statesystem
.AbstractTmfStateProvider
;
34 import org
.eclipse
.linuxtools
.tmf
.core
.statesystem
.ITmfStateProvider
;
35 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.ITmfTimestamp
;
36 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.TmfTimeRange
;
37 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.TmfTimestamp
;
38 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfTrace
;
41 * Partial state history back-end.
43 * This is a shim inserted between the real state system and a "real" history
44 * back-end. It will keep checkpoints, every n trace events (where n is called
45 * the granularity) and will only forward to the real state history the state
46 * intervals that crosses at least one checkpoint. Every other interval will
49 * This would mean that it can only answer queries exactly at the checkpoints.
50 * For any other timestamps (ie, most of the time), it will load the closest
51 * earlier checkpoint, and will re-feed the state-change-input with events from
52 * the trace, to restore the real state at the time that was requested.
54 * @author Alexandre Montplaisir
56 public class PartialHistoryBackend
implements IStateHistoryBackend
{
59 * A partial history needs the state input plugin to re-generate state
60 * between checkpoints.
62 private final ITmfStateProvider partialInput
;
65 * Fake state system that is used for partially rebuilding the states (when
66 * going from a checkpoint to a target query timestamp).
68 private final PartialStateSystem partialSS
;
70 /** Reference to the "real" state history that is used for storage */
71 private final IStateHistoryBackend innerHistory
;
73 /** Checkpoints map, <Timestamp, Rank in the trace> */
74 private final TreeMap
<Long
, Long
> checkpoints
= new TreeMap
<>();
76 /** Latch tracking if the initial checkpoint registration is done */
77 private final CountDownLatch checkpointsReady
= new CountDownLatch(1);
79 private final long granularity
;
81 private long latestTime
;
87 * The state change input object that was used to build the
88 * upstream state system. This partial history will make its own
89 * copy (since they have different targets).
91 * The partial history's inner state system. It should already be
92 * assigned to partialInput.
94 * The real state history back-end to use. It's supposed to be
95 * modular, so it should be able to be of any type.
97 * Configuration parameter indicating how many trace events there
98 * should be between each checkpoint
100 public PartialHistoryBackend(ITmfStateProvider partialInput
, PartialStateSystem pss
,
101 IStateHistoryBackend realBackend
, long granularity
) {
102 if (granularity
<= 0 || partialInput
== null || pss
== null ||
103 partialInput
.getAssignedStateSystem() != pss
) {
104 throw new IllegalArgumentException();
107 final long startTime
= realBackend
.getStartTime();
109 this.partialInput
= partialInput
;
110 this.partialSS
= pss
;
112 this.innerHistory
= realBackend
;
113 this.granularity
= granularity
;
115 latestTime
= startTime
;
117 registerCheckpoints();
120 private void registerCheckpoints() {
121 ITmfEventRequest request
= new CheckpointsRequest(partialInput
, checkpoints
);
122 partialInput
.getTrace().sendRequest(request
);
123 /* The request will countDown the checkpoints latch once it's finished */
127 public long getStartTime() {
128 return innerHistory
.getStartTime();
132 public long getEndTime() {
137 public void insertPastState(long stateStartTime
, long stateEndTime
,
138 int quark
, ITmfStateValue value
) throws TimeRangeException
{
139 waitForCheckpoints();
141 /* Update the latest time */
142 if (stateEndTime
> latestTime
) {
143 latestTime
= stateEndTime
;
147 * Check if the interval intersects the previous checkpoint. If so,
148 * insert it in the real history back-end.
150 * FIXME since intervals are inserted in order of rank, we could avoid
151 * doing a map lookup every time here (just compare with the known
154 if (stateStartTime
<= checkpoints
.floorKey(stateEndTime
)) {
155 innerHistory
.insertPastState(stateStartTime
, stateEndTime
, quark
, value
);
160 public void finishedBuilding(long endTime
) throws TimeRangeException
{
161 innerHistory
.finishedBuilding(endTime
);
165 public FileInputStream
supplyAttributeTreeReader() {
166 return innerHistory
.supplyAttributeTreeReader();
170 public File
supplyAttributeTreeWriterFile() {
171 return innerHistory
.supplyAttributeTreeWriterFile();
175 public long supplyAttributeTreeWriterFilePosition() {
176 return innerHistory
.supplyAttributeTreeWriterFilePosition();
180 public void removeFiles() {
181 innerHistory
.removeFiles();
185 public void dispose() {
186 innerHistory
.dispose();
190 public void doQuery(List
<ITmfStateInterval
> currentStateInfo
, long t
)
191 throws TimeRangeException
, StateSystemDisposedException
{
192 /* Wait for required steps to be done */
193 waitForCheckpoints();
194 partialSS
.getUpstreamSS().waitUntilBuilt();
196 if (!checkValidTime(t
)) {
197 throw new TimeRangeException();
200 /* Reload the previous checkpoint */
201 long checkpointTime
= checkpoints
.floorKey(t
);
202 innerHistory
.doQuery(currentStateInfo
, checkpointTime
);
205 * Set the initial contents of the partial state system (which is the
206 * contents of the query at the checkpoint).
208 partialSS
.takeQueryLock();
209 partialSS
.replaceOngoingState(currentStateInfo
);
211 /* Send an event request to update the state system to the target time. */
212 TmfTimeRange range
= new TmfTimeRange(
214 * The state at the checkpoint already includes any state change
215 * caused by the event(s) happening exactly at 'checkpointTime',
216 * if any. We must not include those events in the query.
218 new TmfTimestamp(checkpointTime
+ 1, ITmfTimestamp
.NANOSECOND_SCALE
),
219 new TmfTimestamp(t
, ITmfTimestamp
.NANOSECOND_SCALE
));
220 ITmfEventRequest request
= new PartialStateSystemRequest(partialInput
, range
);
221 partialInput
.getTrace().sendRequest(request
);
224 request
.waitForCompletion();
225 } catch (InterruptedException e
) {
230 * Now the partial state system should have the ongoing time we are
231 * looking for. However, the method expects a List of *state intervals*,
232 * not state values, so we'll create intervals with a dummy end time.
235 for (int i
= 0; i
< currentStateInfo
.size(); i
++) {
237 ITmfStateValue val
= null;
238 start
= ((ITmfStateSystem
) partialSS
).getOngoingStartTime(i
);
239 val
= ((ITmfStateSystem
) partialSS
).queryOngoingState(i
);
241 ITmfStateInterval interval
= new TmfStateInterval(start
, t
, i
, val
);
242 currentStateInfo
.set(i
, interval
);
244 } catch (AttributeNotFoundException e
) {
245 /* Should not happen, we iterate over existing values. */
249 partialSS
.releaseQueryLock();
253 * Single queries are not supported in partial histories. To get the same
254 * result you can do a full query, then call fullState.get(attribute).
257 public ITmfStateInterval
doSingularQuery(long t
, int attributeQuark
) {
258 throw new UnsupportedOperationException();
262 public boolean checkValidTime(long t
) {
263 return (t
>= getStartTime() && t
<= getEndTime());
267 public void debugPrint(PrintWriter writer
) {
268 // TODO Auto-generated method stub
271 private void waitForCheckpoints() {
273 checkpointsReady
.await();
274 } catch (InterruptedException e
) {
279 // ------------------------------------------------------------------------
280 // Event requests types
281 // ------------------------------------------------------------------------
283 private class CheckpointsRequest
extends TmfEventRequest
{
284 private final ITmfTrace trace
;
285 private final Map
<Long
, Long
> checkpts
;
286 private long eventCount
;
287 private long lastCheckpointAt
;
289 public CheckpointsRequest(ITmfStateProvider input
, Map
<Long
, Long
> checkpoints
) {
290 super(input
.getExpectedEventType(),
291 TmfTimeRange
.ETERNITY
,
293 ITmfEventRequest
.ALL_DATA
,
294 ITmfEventRequest
.ExecutionType
.FOREGROUND
);
296 this.trace
= input
.getTrace();
297 this.checkpts
= checkpoints
;
299 lastCheckpointAt
= 0;
301 /* Insert a checkpoint at the start of the trace */
302 checkpoints
.put(input
.getStartTime(), 0L);
306 public void handleData(final ITmfEvent event
) {
307 super.handleData(event
);
308 if (event
!= null && event
.getTrace() == trace
) {
311 /* Check if we need to register a new checkpoint */
312 if (eventCount
>= lastCheckpointAt
+ granularity
) {
313 checkpts
.put(event
.getTimestamp().getValue(), eventCount
);
314 lastCheckpointAt
= eventCount
;
320 public void handleCompleted() {
321 super.handleCompleted();
322 checkpointsReady
.countDown();
326 private class PartialStateSystemRequest
extends TmfEventRequest
{
327 private final ITmfStateProvider sci
;
328 private final ITmfTrace trace
;
330 PartialStateSystemRequest(ITmfStateProvider sci
, TmfTimeRange range
) {
331 super(sci
.getExpectedEventType(),
334 ITmfEventRequest
.ALL_DATA
,
335 ITmfEventRequest
.ExecutionType
.BACKGROUND
);
337 this.trace
= sci
.getTrace();
341 public void handleData(final ITmfEvent event
) {
342 super.handleData(event
);
343 if (event
!= null && event
.getTrace() == trace
) {
344 sci
.processEvent(event
);
349 public void handleCompleted() {
351 * If we're using a threaded state provider, we need to make sure
352 * all events have been handled by the state system before doing
355 if (partialInput
instanceof AbstractTmfStateProvider
) {
356 ((AbstractTmfStateProvider
) partialInput
).waitForEmptyQueue();
358 super.handleCompleted();