1 /*******************************************************************************
2 * Copyright (c) 2013, 2015 Ericsson
3 * All rights reserved. This program and the accompanying materials are
4 * made available under the terms of the Eclipse Public License v1.0 which
5 * accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
9 * Alexandre Montplaisir - Initial API and implementation
10 * Patrick Tasse - Add message to exceptions
11 *******************************************************************************/
13 package org
.eclipse
.tracecompass
.internal
.tmf
.core
.statesystem
.backends
.partial
;
15 import static org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
.checkNotNull
;
16 import static org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
.checkNotNullContents
;
19 import java
.io
.FileInputStream
;
20 import java
.io
.PrintWriter
;
21 import java
.util
.List
;
23 import java
.util
.TreeMap
;
24 import java
.util
.concurrent
.CountDownLatch
;
25 import java
.util
.stream
.Collectors
;
27 import org
.eclipse
.jdt
.annotation
.NonNull
;
28 import org
.eclipse
.jdt
.annotation
.Nullable
;
29 import org
.eclipse
.tracecompass
.statesystem
.core
.ITmfStateSystem
;
30 import org
.eclipse
.tracecompass
.statesystem
.core
.backend
.IStateHistoryBackend
;
31 import org
.eclipse
.tracecompass
.statesystem
.core
.exceptions
.AttributeNotFoundException
;
32 import org
.eclipse
.tracecompass
.statesystem
.core
.exceptions
.StateSystemDisposedException
;
33 import org
.eclipse
.tracecompass
.statesystem
.core
.exceptions
.TimeRangeException
;
34 import org
.eclipse
.tracecompass
.statesystem
.core
.interval
.ITmfStateInterval
;
35 import org
.eclipse
.tracecompass
.statesystem
.core
.interval
.TmfStateInterval
;
36 import org
.eclipse
.tracecompass
.statesystem
.core
.statevalue
.ITmfStateValue
;
37 import org
.eclipse
.tracecompass
.tmf
.core
.event
.ITmfEvent
;
38 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
;
39 import org
.eclipse
.tracecompass
.tmf
.core
.request
.TmfEventRequest
;
40 import org
.eclipse
.tracecompass
.tmf
.core
.statesystem
.AbstractTmfStateProvider
;
41 import org
.eclipse
.tracecompass
.tmf
.core
.statesystem
.ITmfStateProvider
;
42 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.ITmfTimestamp
;
43 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.TmfTimeRange
;
44 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.TmfTimestamp
;
45 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfTrace
;
48 * Partial state history back-end.
50 * This is a shim inserted between the real state system and a "real" history
51 * back-end. It will keep checkpoints, every n trace events (where n is called
52 * the granularity) and will only forward to the real state history the state
53 * intervals that crosses at least one checkpoint. Every other interval will
56 * This would mean that it can only answer queries exactly at the checkpoints.
57 * For any other timestamps (ie, most of the time), it will load the closest
58 * earlier checkpoint, and will re-feed the state-change-input with events from
59 * the trace, to restore the real state at the time that was requested.
61 * @author Alexandre Montplaisir
63 public class PartialHistoryBackend
implements IStateHistoryBackend
{
65 private final @NonNull String fSSID
;
68 * A partial history needs the state input plugin to re-generate state
69 * between checkpoints.
71 private final @NonNull ITmfStateProvider fPartialInput
;
74 * Fake state system that is used for partially rebuilding the states (when
75 * going from a checkpoint to a target query timestamp).
77 private final @NonNull PartialStateSystem fPartialSS
;
79 /** Reference to the "real" state history that is used for storage */
80 private final @NonNull IStateHistoryBackend fInnerHistory
;
82 /** Checkpoints map, <Timestamp, Rank in the trace> */
83 private final @NonNull TreeMap
<Long
, Long
> fCheckpoints
= new TreeMap
<>();
85 /** Latch tracking if the initial checkpoint registration is done */
86 private final @NonNull CountDownLatch fCheckpointsReady
= new CountDownLatch(1);
88 private final long fGranularity
;
90 private long fLatestTime
;
96 * The state system's ID
98 * The state change input object that was used to build the
99 * upstream state system. This partial history will make its own
100 * copy (since they have different targets).
102 * The partial history's inner state system. It should already be
103 * assigned to partialInput.
105 * The real state history back-end to use. It's supposed to be
106 * modular, so it should be able to be of any type.
108 * Configuration parameter indicating how many trace events there
109 * should be between each checkpoint
111 public PartialHistoryBackend(@NonNull String ssid
,
112 ITmfStateProvider partialInput
,
113 PartialStateSystem pss
,
114 IStateHistoryBackend realBackend
,
116 if (granularity
<= 0 || partialInput
== null || pss
== null ||
117 partialInput
.getAssignedStateSystem() != pss
) {
118 throw new IllegalArgumentException();
121 final long startTime
= realBackend
.getStartTime();
124 fPartialInput
= partialInput
;
127 fInnerHistory
= realBackend
;
128 fGranularity
= granularity
;
130 fLatestTime
= startTime
;
132 registerCheckpoints();
135 private void registerCheckpoints() {
136 ITmfEventRequest request
= new CheckpointsRequest(fPartialInput
, fCheckpoints
);
137 fPartialInput
.getTrace().sendRequest(request
);
138 /* The request will countDown the checkpoints latch once it's finished */
142 public String
getSSID() {
147 public long getStartTime() {
148 return fInnerHistory
.getStartTime();
152 public long getEndTime() {
157 public void insertPastState(long stateStartTime
, long stateEndTime
,
158 int quark
, ITmfStateValue value
) throws TimeRangeException
{
159 waitForCheckpoints();
161 /* Update the latest time */
162 if (stateEndTime
> fLatestTime
) {
163 fLatestTime
= stateEndTime
;
167 * Check if the interval intersects the previous checkpoint. If so,
168 * insert it in the real history back-end.
170 * FIXME since intervals are inserted in order of rank, we could avoid
171 * doing a map lookup every time here (just compare with the known
174 if (stateStartTime
<= fCheckpoints
.floorKey(stateEndTime
)) {
175 fInnerHistory
.insertPastState(stateStartTime
, stateEndTime
, quark
, value
);
180 public void finishedBuilding(long endTime
) throws TimeRangeException
{
181 fInnerHistory
.finishedBuilding(endTime
);
185 public FileInputStream
supplyAttributeTreeReader() {
186 return fInnerHistory
.supplyAttributeTreeReader();
190 public File
supplyAttributeTreeWriterFile() {
191 return fInnerHistory
.supplyAttributeTreeWriterFile();
195 public long supplyAttributeTreeWriterFilePosition() {
196 return fInnerHistory
.supplyAttributeTreeWriterFilePosition();
200 public void removeFiles() {
201 fInnerHistory
.removeFiles();
205 public void dispose() {
206 fPartialInput
.dispose();
207 fPartialSS
.dispose();
208 fInnerHistory
.dispose();
212 public void doQuery(List
<@Nullable ITmfStateInterval
> currentStateInfo
, long t
)
213 throws TimeRangeException
, StateSystemDisposedException
{
214 /* Wait for required steps to be done */
215 waitForCheckpoints();
216 fPartialSS
.getUpstreamSS().waitUntilBuilt();
218 if (!checkValidTime(t
)) {
219 throw new TimeRangeException(fSSID
+ " Time:" + t
+ ", Start:" + getStartTime() + ", End:" + getEndTime()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
222 /* Reload the previous checkpoint */
223 long checkpointTime
= fCheckpoints
.floorKey(t
);
224 fInnerHistory
.doQuery(currentStateInfo
, checkpointTime
);
227 * Set the initial contents of the partial state system (which is the
228 * contents of the query at the checkpoint).
230 List
<@NonNull ITmfStateInterval
> filledStateInfo
=
231 checkNotNullContents(currentStateInfo
.stream()).collect(Collectors
.toList());
233 fPartialSS
.takeQueryLock();
234 fPartialSS
.replaceOngoingState(filledStateInfo
);
236 /* Send an event request to update the state system to the target time. */
237 TmfTimeRange range
= new TmfTimeRange(
239 * The state at the checkpoint already includes any state change
240 * caused by the event(s) happening exactly at 'checkpointTime',
241 * if any. We must not include those events in the query.
243 new TmfTimestamp(checkpointTime
+ 1, ITmfTimestamp
.NANOSECOND_SCALE
),
244 new TmfTimestamp(t
, ITmfTimestamp
.NANOSECOND_SCALE
));
245 ITmfEventRequest request
= new PartialStateSystemRequest(fPartialInput
, range
);
246 fPartialInput
.getTrace().sendRequest(request
);
249 request
.waitForCompletion();
250 } catch (InterruptedException e
) {
255 * Now the partial state system should have the ongoing time we are
256 * looking for. However, the method expects a List of *state intervals*,
257 * not state values, so we'll create intervals with a dummy end time.
260 for (int i
= 0; i
< currentStateInfo
.size(); i
++) {
262 start
= ((ITmfStateSystem
) fPartialSS
).getOngoingStartTime(i
);
263 ITmfStateValue val
= ((ITmfStateSystem
) fPartialSS
).queryOngoingState(i
);
265 ITmfStateInterval interval
= new TmfStateInterval(start
, t
, i
, checkNotNull(val
));
266 currentStateInfo
.set(i
, interval
);
268 } catch (AttributeNotFoundException e
) {
269 /* Should not happen, we iterate over existing values. */
273 fPartialSS
.releaseQueryLock();
277 * Single queries are not supported in partial histories. To get the same
278 * result you can do a full query, then call fullState.get(attribute).
281 public ITmfStateInterval
doSingularQuery(long t
, int attributeQuark
) {
282 throw new UnsupportedOperationException();
285 private boolean checkValidTime(long t
) {
286 return (t
>= getStartTime() && t
<= getEndTime());
290 public void debugPrint(PrintWriter writer
) {
291 // TODO Auto-generated method stub
294 private void waitForCheckpoints() {
296 fCheckpointsReady
.await();
297 } catch (InterruptedException e
) {
302 // ------------------------------------------------------------------------
303 // Event requests types
304 // ------------------------------------------------------------------------
306 private class CheckpointsRequest
extends TmfEventRequest
{
307 private final ITmfTrace trace
;
308 private final Map
<Long
, Long
> checkpts
;
309 private long eventCount
;
310 private long lastCheckpointAt
;
312 public CheckpointsRequest(ITmfStateProvider input
, Map
<Long
, Long
> checkpoints
) {
313 super(ITmfEvent
.class,
314 TmfTimeRange
.ETERNITY
,
316 ITmfEventRequest
.ALL_DATA
,
317 ITmfEventRequest
.ExecutionType
.FOREGROUND
);
319 this.trace
= input
.getTrace();
320 this.checkpts
= checkpoints
;
322 lastCheckpointAt
= 0;
324 /* Insert a checkpoint at the start of the trace */
325 checkpoints
.put(input
.getStartTime(), 0L);
329 public void handleData(final ITmfEvent event
) {
330 super.handleData(event
);
331 if (event
.getTrace() == trace
) {
334 /* Check if we need to register a new checkpoint */
335 if (eventCount
>= lastCheckpointAt
+ fGranularity
) {
336 checkpts
.put(event
.getTimestamp().getValue(), eventCount
);
337 lastCheckpointAt
= eventCount
;
343 public void handleCompleted() {
344 super.handleCompleted();
345 fCheckpointsReady
.countDown();
349 private class PartialStateSystemRequest
extends TmfEventRequest
{
350 private final ITmfStateProvider sci
;
351 private final ITmfTrace trace
;
353 PartialStateSystemRequest(ITmfStateProvider sci
, TmfTimeRange range
) {
354 super(ITmfEvent
.class,
357 ITmfEventRequest
.ALL_DATA
,
358 ITmfEventRequest
.ExecutionType
.BACKGROUND
);
360 this.trace
= sci
.getTrace();
364 public void handleData(final ITmfEvent event
) {
365 super.handleData(event
);
366 if (event
.getTrace() == trace
) {
367 sci
.processEvent(event
);
372 public void handleCompleted() {
374 * If we're using a threaded state provider, we need to make sure
375 * all events have been handled by the state system before doing
378 if (fPartialInput
instanceof AbstractTmfStateProvider
) {
379 ((AbstractTmfStateProvider
) fPartialInput
).waitForEmptyQueue();
381 super.handleCompleted();