e9b269c014fc61454fa57195c7e4af1b81664ee4
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / internal / tmf / core / statesystem / backends / partial / PartialHistoryBackend.java
1 /*******************************************************************************
2 * Copyright (c) 2013, 2015 Ericsson
3 * All rights reserved. This program and the accompanying materials are
4 * made available under the terms of the Eclipse Public License v1.0 which
5 * accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * Alexandre Montplaisir - Initial API and implementation
10 * Patrick Tasse - Add message to exceptions
11 *******************************************************************************/
12
13 package org.eclipse.tracecompass.internal.tmf.core.statesystem.backends.partial;
14
15 import java.io.File;
16 import java.io.FileInputStream;
17 import java.io.PrintWriter;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.TreeMap;
21 import java.util.concurrent.CountDownLatch;
22
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem;
25 import org.eclipse.tracecompass.statesystem.core.backend.IStateHistoryBackend;
26 import org.eclipse.tracecompass.statesystem.core.exceptions.AttributeNotFoundException;
27 import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException;
28 import org.eclipse.tracecompass.statesystem.core.exceptions.TimeRangeException;
29 import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval;
30 import org.eclipse.tracecompass.statesystem.core.interval.TmfStateInterval;
31 import org.eclipse.tracecompass.statesystem.core.statevalue.ITmfStateValue;
32 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
33 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
34 import org.eclipse.tracecompass.tmf.core.request.TmfEventRequest;
35 import org.eclipse.tracecompass.tmf.core.statesystem.AbstractTmfStateProvider;
36 import org.eclipse.tracecompass.tmf.core.statesystem.ITmfStateProvider;
37 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
38 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
39 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimestamp;
40 import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
41
42 /**
43 * Partial state history back-end.
44 *
45 * This is a shim inserted between the real state system and a "real" history
46 * back-end. It will keep checkpoints, every n trace events (where n is called
47 * the granularity) and will only forward to the real state history the state
48 * intervals that crosses at least one checkpoint. Every other interval will
49 * be discarded.
50 *
51 * This would mean that it can only answer queries exactly at the checkpoints.
52 * For any other timestamps (ie, most of the time), it will load the closest
53 * earlier checkpoint, and will re-feed the state-change-input with events from
54 * the trace, to restore the real state at the time that was requested.
55 *
56 * @author Alexandre Montplaisir
57 */
58 public class PartialHistoryBackend implements IStateHistoryBackend {
59
60 private final @NonNull String fSSID;
61
62 /**
63 * A partial history needs the state input plugin to re-generate state
64 * between checkpoints.
65 */
66 private final @NonNull ITmfStateProvider fPartialInput;
67
68 /**
69 * Fake state system that is used for partially rebuilding the states (when
70 * going from a checkpoint to a target query timestamp).
71 */
72 private final @NonNull PartialStateSystem fPartialSS;
73
74 /** Reference to the "real" state history that is used for storage */
75 private final @NonNull IStateHistoryBackend fInnerHistory;
76
77 /** Checkpoints map, <Timestamp, Rank in the trace> */
78 private final @NonNull TreeMap<Long, Long> fCheckpoints = new TreeMap<>();
79
80 /** Latch tracking if the initial checkpoint registration is done */
81 private final @NonNull CountDownLatch fCheckpointsReady = new CountDownLatch(1);
82
83 private final long fGranularity;
84
85 private long fLatestTime;
86
87 /**
88 * Constructor
89 *
90 * @param ssid
91 * The state system's ID
92 * @param partialInput
93 * The state change input object that was used to build the
94 * upstream state system. This partial history will make its own
95 * copy (since they have different targets).
96 * @param pss
97 * The partial history's inner state system. It should already be
98 * assigned to partialInput.
99 * @param realBackend
100 * The real state history back-end to use. It's supposed to be
101 * modular, so it should be able to be of any type.
102 * @param granularity
103 * Configuration parameter indicating how many trace events there
104 * should be between each checkpoint
105 */
106 public PartialHistoryBackend(@NonNull String ssid,
107 ITmfStateProvider partialInput,
108 PartialStateSystem pss,
109 IStateHistoryBackend realBackend,
110 long granularity) {
111 if (granularity <= 0 || partialInput == null || pss == null ||
112 partialInput.getAssignedStateSystem() != pss) {
113 throw new IllegalArgumentException();
114 }
115
116 final long startTime = realBackend.getStartTime();
117
118 fSSID = ssid;
119 fPartialInput = partialInput;
120 fPartialSS = pss;
121
122 fInnerHistory = realBackend;
123 fGranularity = granularity;
124
125 fLatestTime = startTime;
126
127 registerCheckpoints();
128 }
129
130 private void registerCheckpoints() {
131 ITmfEventRequest request = new CheckpointsRequest(fPartialInput, fCheckpoints);
132 fPartialInput.getTrace().sendRequest(request);
133 /* The request will countDown the checkpoints latch once it's finished */
134 }
135
136 @Override
137 public String getSSID() {
138 return fSSID;
139 }
140
141 @Override
142 public long getStartTime() {
143 return fInnerHistory.getStartTime();
144 }
145
146 @Override
147 public long getEndTime() {
148 return fLatestTime;
149 }
150
151 @Override
152 public void insertPastState(long stateStartTime, long stateEndTime,
153 int quark, ITmfStateValue value) throws TimeRangeException {
154 waitForCheckpoints();
155
156 /* Update the latest time */
157 if (stateEndTime > fLatestTime) {
158 fLatestTime = stateEndTime;
159 }
160
161 /*
162 * Check if the interval intersects the previous checkpoint. If so,
163 * insert it in the real history back-end.
164 *
165 * FIXME since intervals are inserted in order of rank, we could avoid
166 * doing a map lookup every time here (just compare with the known
167 * previous one).
168 */
169 if (stateStartTime <= fCheckpoints.floorKey(stateEndTime)) {
170 fInnerHistory.insertPastState(stateStartTime, stateEndTime, quark, value);
171 }
172 }
173
174 @Override
175 public void finishedBuilding(long endTime) throws TimeRangeException {
176 fInnerHistory.finishedBuilding(endTime);
177 }
178
179 @Override
180 public FileInputStream supplyAttributeTreeReader() {
181 return fInnerHistory.supplyAttributeTreeReader();
182 }
183
184 @Override
185 public File supplyAttributeTreeWriterFile() {
186 return fInnerHistory.supplyAttributeTreeWriterFile();
187 }
188
189 @Override
190 public long supplyAttributeTreeWriterFilePosition() {
191 return fInnerHistory.supplyAttributeTreeWriterFilePosition();
192 }
193
194 @Override
195 public void removeFiles() {
196 fInnerHistory.removeFiles();
197 }
198
199 @Override
200 public void dispose() {
201 fPartialInput.dispose();
202 fPartialSS.dispose();
203 fInnerHistory.dispose();
204 }
205
206 @Override
207 public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
208 throws TimeRangeException, StateSystemDisposedException {
209 /* Wait for required steps to be done */
210 waitForCheckpoints();
211 fPartialSS.getUpstreamSS().waitUntilBuilt();
212
213 if (!checkValidTime(t)) {
214 throw new TimeRangeException(fSSID + " Time:" + t + ", Start:" + getStartTime() + ", End:" + getEndTime()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
215 }
216
217 /* Reload the previous checkpoint */
218 long checkpointTime = fCheckpoints.floorKey(t);
219 fInnerHistory.doQuery(currentStateInfo, checkpointTime);
220
221 /*
222 * Set the initial contents of the partial state system (which is the
223 * contents of the query at the checkpoint).
224 */
225 fPartialSS.takeQueryLock();
226 fPartialSS.replaceOngoingState(currentStateInfo);
227
228 /* Send an event request to update the state system to the target time. */
229 TmfTimeRange range = new TmfTimeRange(
230 /*
231 * The state at the checkpoint already includes any state change
232 * caused by the event(s) happening exactly at 'checkpointTime',
233 * if any. We must not include those events in the query.
234 */
235 new TmfTimestamp(checkpointTime + 1, ITmfTimestamp.NANOSECOND_SCALE),
236 new TmfTimestamp(t, ITmfTimestamp.NANOSECOND_SCALE));
237 ITmfEventRequest request = new PartialStateSystemRequest(fPartialInput, range);
238 fPartialInput.getTrace().sendRequest(request);
239
240 try {
241 request.waitForCompletion();
242 } catch (InterruptedException e) {
243 e.printStackTrace();
244 }
245
246 /*
247 * Now the partial state system should have the ongoing time we are
248 * looking for. However, the method expects a List of *state intervals*,
249 * not state values, so we'll create intervals with a dummy end time.
250 */
251 try {
252 for (int i = 0; i < currentStateInfo.size(); i++) {
253 long start = 0;
254 ITmfStateValue val = null;
255 start = ((ITmfStateSystem) fPartialSS).getOngoingStartTime(i);
256 val = ((ITmfStateSystem) fPartialSS).queryOngoingState(i);
257
258 ITmfStateInterval interval = new TmfStateInterval(start, t, i, val);
259 currentStateInfo.set(i, interval);
260 }
261 } catch (AttributeNotFoundException e) {
262 /* Should not happen, we iterate over existing values. */
263 e.printStackTrace();
264 }
265
266 fPartialSS.releaseQueryLock();
267 }
268
269 /**
270 * Single queries are not supported in partial histories. To get the same
271 * result you can do a full query, then call fullState.get(attribute).
272 */
273 @Override
274 public ITmfStateInterval doSingularQuery(long t, int attributeQuark) {
275 throw new UnsupportedOperationException();
276 }
277
278 private boolean checkValidTime(long t) {
279 return (t >= getStartTime() && t <= getEndTime());
280 }
281
282 @Override
283 public void debugPrint(PrintWriter writer) {
284 // TODO Auto-generated method stub
285 }
286
287 private void waitForCheckpoints() {
288 try {
289 fCheckpointsReady.await();
290 } catch (InterruptedException e) {
291 e.printStackTrace();
292 }
293 }
294
295 // ------------------------------------------------------------------------
296 // Event requests types
297 // ------------------------------------------------------------------------
298
299 private class CheckpointsRequest extends TmfEventRequest {
300 private final ITmfTrace trace;
301 private final Map<Long, Long> checkpts;
302 private long eventCount;
303 private long lastCheckpointAt;
304
305 public CheckpointsRequest(ITmfStateProvider input, Map<Long, Long> checkpoints) {
306 super(ITmfEvent.class,
307 TmfTimeRange.ETERNITY,
308 0,
309 ITmfEventRequest.ALL_DATA,
310 ITmfEventRequest.ExecutionType.FOREGROUND);
311 checkpoints.clear();
312 this.trace = input.getTrace();
313 this.checkpts = checkpoints;
314 eventCount = 0;
315 lastCheckpointAt = 0;
316
317 /* Insert a checkpoint at the start of the trace */
318 checkpoints.put(input.getStartTime(), 0L);
319 }
320
321 @Override
322 public void handleData(final ITmfEvent event) {
323 super.handleData(event);
324 if (event.getTrace() == trace) {
325 eventCount++;
326
327 /* Check if we need to register a new checkpoint */
328 if (eventCount >= lastCheckpointAt + fGranularity) {
329 checkpts.put(event.getTimestamp().getValue(), eventCount);
330 lastCheckpointAt = eventCount;
331 }
332 }
333 }
334
335 @Override
336 public void handleCompleted() {
337 super.handleCompleted();
338 fCheckpointsReady.countDown();
339 }
340 }
341
342 private class PartialStateSystemRequest extends TmfEventRequest {
343 private final ITmfStateProvider sci;
344 private final ITmfTrace trace;
345
346 PartialStateSystemRequest(ITmfStateProvider sci, TmfTimeRange range) {
347 super(ITmfEvent.class,
348 range,
349 0,
350 ITmfEventRequest.ALL_DATA,
351 ITmfEventRequest.ExecutionType.BACKGROUND);
352 this.sci = sci;
353 this.trace = sci.getTrace();
354 }
355
356 @Override
357 public void handleData(final ITmfEvent event) {
358 super.handleData(event);
359 if (event.getTrace() == trace) {
360 sci.processEvent(event);
361 }
362 }
363
364 @Override
365 public void handleCompleted() {
366 /*
367 * If we're using a threaded state provider, we need to make sure
368 * all events have been handled by the state system before doing
369 * queries on it.
370 */
371 if (fPartialInput instanceof AbstractTmfStateProvider) {
372 ((AbstractTmfStateProvider) fPartialInput).waitForEmptyQueue();
373 }
374 super.handleCompleted();
375 }
376
377 }
378 }
This page took 0.039573 seconds and 4 git commands to generate.