Commit | Line | Data |
---|---|---|
8c8bf09f | 1 | /******************************************************************************* |
e31e01e8 | 2 | * Copyright (c) 2009, 2010 Ericsson |
8c8bf09f ASL |
3 | * |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Francois Chouinard - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.tmf.trace; | |
14 | ||
b0a282fb | 15 | import java.io.File; |
62d1696a | 16 | import java.io.FileNotFoundException; |
62d1696a | 17 | import java.util.Collections; |
8c8bf09f ASL |
18 | import java.util.Vector; |
19 | ||
fc6ccf6f | 20 | import org.eclipse.linuxtools.tmf.component.TmfEventProvider; |
8c8bf09f ASL |
21 | import org.eclipse.linuxtools.tmf.event.TmfEvent; |
22 | import org.eclipse.linuxtools.tmf.event.TmfTimeRange; | |
23 | import org.eclipse.linuxtools.tmf.event.TmfTimestamp; | |
2fb2eb37 | 24 | import org.eclipse.linuxtools.tmf.request.ITmfDataRequest; |
64267c9d | 25 | import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType; |
2fb2eb37 | 26 | import org.eclipse.linuxtools.tmf.request.ITmfEventRequest; |
83e13355 FC |
27 | import org.eclipse.linuxtools.tmf.request.TmfDataRequest; |
28 | import org.eclipse.linuxtools.tmf.request.TmfEventRequest; | |
29 | import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler; | |
30 | import org.eclipse.linuxtools.tmf.signal.TmfTraceOpenedSignal; | |
31 | import org.eclipse.linuxtools.tmf.signal.TmfTraceUpdatedSignal; | |
8c8bf09f ASL |
32 | |
33 | /** | |
146a887c | 34 | * <b><u>TmfTrace</u></b> |
8c8bf09f | 35 | * <p> |
146a887c FC |
36 | * Abstract implementation of ITmfTrace. It should be sufficient to extend this |
37 | * class and provide implementation for <code>getCurrentLocation()</code> and | |
38 | * <code>seekLocation()</code>, as well as a proper parser, to have a working | |
4e3aa37d | 39 | * concrete implementation. |
ff4ed569 | 40 | * <p> |
54d55ced | 41 | * Note: The notion of event rank is still under heavy discussion. Although |
ff4ed569 | 42 | * used by the Events View and probably useful in the general case, there |
54d55ced | 43 | * is no easy way to implement it for LTTng (actually a strong case is being |
ff4ed569 FC |
44 | * made that this is useless). |
45 | * <p> | |
46 | * That it is not supported by LTTng does by no mean indicate that it is not | |
47 | * useful for (just about) every other tracing tool. Therefore, this class | |
48 | * provides a minimal (and partial) implementation of rank. However, the current | |
49 | * implementation should not be relied on in the general case. | |
54d55ced | 50 | * |
4e3aa37d | 51 | * TODO: Add support for live streaming (notifications, incremental indexing, ...) |
8c8bf09f | 52 | */ |
ff4ed569 | 53 | public abstract class TmfTrace<T extends TmfEvent> extends TmfEventProvider<T> implements ITmfTrace, Cloneable { |
62d1696a | 54 | |
e31e01e8 | 55 | // ------------------------------------------------------------------------ |
62d1696a | 56 | // Constants |
e31e01e8 | 57 | // ------------------------------------------------------------------------ |
62d1696a FC |
58 | |
59 | // The default number of events to cache | |
e31e01e8 | 60 | // TODO: Make the DEFAULT_CACHE_SIZE a preference |
b12f4544 | 61 | public static final int DEFAULT_INDEX_PAGE_SIZE = 50000; |
8c8bf09f | 62 | |
e31e01e8 | 63 | // ------------------------------------------------------------------------ |
8c8bf09f | 64 | // Attributes |
e31e01e8 | 65 | // ------------------------------------------------------------------------ |
8c8bf09f | 66 | |
b0a282fb FC |
67 | // The trace path |
68 | private final String fPath; | |
69 | ||
8d2e2848 | 70 | // The cache page size AND checkpoints interval |
9f584e4c | 71 | protected int fIndexPageSize; |
62d1696a FC |
72 | |
73 | // The set of event stream checkpoints (for random access) | |
9f584e4c | 74 | protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>(); |
62d1696a FC |
75 | |
76 | // The number of events collected | |
a3fe52fc | 77 | protected long fNbEvents = 0; |
62d1696a FC |
78 | |
79 | // The time span of the event stream | |
cb866e08 FC |
80 | private TmfTimestamp fStartTime = TmfTimestamp.BigCrunch; |
81 | private TmfTimestamp fEndTime = TmfTimestamp.BigBang; | |
62d1696a | 82 | |
e31e01e8 | 83 | // ------------------------------------------------------------------------ |
50adc88e | 84 | // Constructors |
e31e01e8 | 85 | // ------------------------------------------------------------------------ |
8c8bf09f | 86 | |
ff4ed569 FC |
87 | /** |
88 | * @param path | |
89 | * @throws FileNotFoundException | |
90 | */ | |
ce785d7d | 91 | protected TmfTrace(String name, Class<T> type, String path) throws FileNotFoundException { |
664902f7 | 92 | this(name, type, path, DEFAULT_INDEX_PAGE_SIZE); |
ff4ed569 FC |
93 | } |
94 | ||
62d1696a | 95 | /** |
e31e01e8 FC |
96 | * @param path |
97 | * @param cacheSize | |
62d1696a FC |
98 | * @throws FileNotFoundException |
99 | */ | |
ce785d7d FC |
100 | protected TmfTrace(String name, Class<T> type, String path, int cacheSize) throws FileNotFoundException { |
101 | super(name, type); | |
b0a282fb | 102 | int sep = path.lastIndexOf(File.separator); |
ce785d7d FC |
103 | String simpleName = (sep >= 0) ? path.substring(sep + 1) : path; |
104 | setName(simpleName); | |
b0a282fb | 105 | fPath = path; |
664902f7 | 106 | fIndexPageSize = (cacheSize > 0) ? cacheSize : DEFAULT_INDEX_PAGE_SIZE; |
8c8bf09f ASL |
107 | } |
108 | ||
ff4ed569 FC |
109 | /* (non-Javadoc) |
110 | * @see java.lang.Object#clone() | |
62d1696a | 111 | */ |
ff4ed569 FC |
112 | @SuppressWarnings("unchecked") |
113 | @Override | |
114 | public TmfTrace<T> clone() throws CloneNotSupportedException { | |
115 | TmfTrace<T> clone = (TmfTrace<T>) super.clone(); | |
cb866e08 FC |
116 | clone.fCheckpoints = (Vector<TmfCheckpoint>) fCheckpoints; |
117 | clone.fStartTime = new TmfTimestamp(fStartTime); | |
118 | clone.fEndTime = new TmfTimestamp(fEndTime); | |
ff4ed569 | 119 | return clone; |
8c8bf09f ASL |
120 | } |
121 | ||
e31e01e8 | 122 | // ------------------------------------------------------------------------ |
8c8bf09f | 123 | // Accessors |
e31e01e8 | 124 | // ------------------------------------------------------------------------ |
8c8bf09f | 125 | |
62d1696a | 126 | /** |
b0a282fb | 127 | * @return the trace path |
62d1696a | 128 | */ |
d4011df2 FC |
129 | @Override |
130 | public String getPath() { | |
b0a282fb | 131 | return fPath; |
8c8bf09f ASL |
132 | } |
133 | ||
62d1696a FC |
134 | /* (non-Javadoc) |
135 | * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getNbEvents() | |
136 | */ | |
d4011df2 FC |
137 | @Override |
138 | public long getNbEvents() { | |
62d1696a | 139 | return fNbEvents; |
8c8bf09f ASL |
140 | } |
141 | ||
b0a282fb FC |
142 | /** |
143 | * @return the size of the cache | |
144 | */ | |
d4011df2 FC |
145 | @Override |
146 | public int getCacheSize() { | |
9f584e4c | 147 | return fIndexPageSize; |
b0a282fb FC |
148 | } |
149 | ||
62d1696a FC |
150 | /* (non-Javadoc) |
151 | * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getTimeRange() | |
152 | */ | |
d4011df2 FC |
153 | @Override |
154 | public TmfTimeRange getTimeRange() { | |
cb866e08 | 155 | return new TmfTimeRange(fStartTime, fEndTime); |
8c8bf09f ASL |
156 | } |
157 | ||
e31e01e8 FC |
158 | /* (non-Javadoc) |
159 | * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStartTime() | |
160 | */ | |
d4011df2 FC |
161 | @Override |
162 | public TmfTimestamp getStartTime() { | |
cb866e08 | 163 | return fStartTime; |
146a887c FC |
164 | } |
165 | ||
e31e01e8 FC |
166 | /* (non-Javadoc) |
167 | * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getEndTime() | |
168 | */ | |
d4011df2 FC |
169 | @Override |
170 | public TmfTimestamp getEndTime() { | |
cb866e08 | 171 | return fEndTime; |
146a887c FC |
172 | } |
173 | ||
ff4ed569 FC |
174 | @SuppressWarnings("unchecked") |
175 | public Vector<TmfCheckpoint> getCheckpoints() { | |
176 | return (Vector<TmfCheckpoint>) fCheckpoints.clone(); | |
54d55ced FC |
177 | } |
178 | ||
abfad0aa FC |
179 | /** |
180 | * Returns the rank of the first event with the requested timestamp. | |
181 | * If none, returns the index of the next event (if any). | |
182 | * | |
183 | * @param timestamp | |
184 | * @return | |
185 | */ | |
d4011df2 FC |
186 | @Override |
187 | public long getRank(TmfTimestamp timestamp) { | |
abfad0aa FC |
188 | TmfContext context = seekEvent(timestamp); |
189 | return context.getRank(); | |
190 | } | |
191 | ||
e31e01e8 | 192 | // ------------------------------------------------------------------------ |
8c8bf09f | 193 | // Operators |
e31e01e8 | 194 | // ------------------------------------------------------------------------ |
8c8bf09f | 195 | |
4e3aa37d | 196 | protected void setTimeRange(TmfTimeRange range) { |
cb866e08 FC |
197 | fStartTime = range.getStartTime(); |
198 | fEndTime = range.getEndTime(); | |
4e3aa37d FC |
199 | } |
200 | ||
201 | protected void setStartTime(TmfTimestamp startTime) { | |
cb866e08 | 202 | fStartTime = startTime; |
4e3aa37d FC |
203 | } |
204 | ||
205 | protected void setEndTime(TmfTimestamp endTime) { | |
cb866e08 | 206 | fEndTime = endTime; |
4e3aa37d FC |
207 | } |
208 | ||
e31e01e8 FC |
209 | // ------------------------------------------------------------------------ |
210 | // TmfProvider | |
211 | // ------------------------------------------------------------------------ | |
212 | ||
213 | @Override | |
2fb2eb37 FC |
214 | public ITmfContext armRequest(ITmfDataRequest<T> request) { |
215 | if (request instanceof ITmfEventRequest<?>) { | |
216 | return seekEvent(((ITmfEventRequest<T>) request).getRange().getStartTime()); | |
e31e01e8 | 217 | } |
ff4ed569 | 218 | return seekEvent(request.getIndex()); |
e31e01e8 FC |
219 | } |
220 | ||
221 | /** | |
222 | * Return the next piece of data based on the context supplied. The context | |
223 | * would typically be updated for the subsequent read. | |
224 | * | |
225 | * @param context | |
226 | * @return | |
227 | */ | |
228 | @SuppressWarnings("unchecked") | |
229 | @Override | |
230 | public T getNext(ITmfContext context) { | |
9f584e4c FC |
231 | if (context instanceof TmfContext) { |
232 | return (T) getNextEvent((TmfContext) context); | |
e31e01e8 FC |
233 | } |
234 | return null; | |
235 | } | |
236 | ||
e31e01e8 FC |
237 | // ------------------------------------------------------------------------ |
238 | // ITmfTrace | |
239 | // ------------------------------------------------------------------------ | |
240 | ||
146a887c FC |
241 | /* (non-Javadoc) |
242 | * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools.tmf.event.TmfTimestamp) | |
243 | */ | |
d4011df2 FC |
244 | @Override |
245 | public TmfContext seekEvent(TmfTimestamp timestamp) { | |
62d1696a | 246 | |
4e3aa37d FC |
247 | if (timestamp == null) { |
248 | timestamp = TmfTimestamp.BigBang; | |
249 | } | |
250 | ||
251 | // First, find the right checkpoint | |
9f584e4c | 252 | int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null)); |
62d1696a | 253 | |
8d2e2848 | 254 | // In the very likely case that the checkpoint was not found, bsearch |
62d1696a FC |
255 | // returns its negated would-be location (not an offset...). From that |
256 | // index, we can then position the stream and get the event. | |
257 | if (index < 0) { | |
258 | index = Math.max(0, -(index + 2)); | |
259 | } | |
260 | ||
261 | // Position the stream at the checkpoint | |
452ad365 | 262 | ITmfLocation<?> location; |
e31e01e8 FC |
263 | synchronized (fCheckpoints) { |
264 | if (fCheckpoints.size() > 0) { | |
265 | if (index >= fCheckpoints.size()) { | |
266 | index = fCheckpoints.size() - 1; | |
267 | } | |
268 | location = fCheckpoints.elementAt(index).getLocation(); | |
269 | } | |
270 | else { | |
271 | location = null; | |
272 | } | |
8d2e2848 | 273 | } |
54d55ced FC |
274 | TmfContext context = seekLocation(location); |
275 | context.setRank(index * fIndexPageSize); | |
62d1696a | 276 | |
54d55ced | 277 | // And locate the event |
ff4ed569 | 278 | TmfContext nextEventContext = context.clone(); // Must use clone() to get the right subtype... |
62d1696a FC |
279 | TmfEvent event = getNextEvent(nextEventContext); |
280 | while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) { | |
54d55ced FC |
281 | context.setLocation(nextEventContext.getLocation().clone()); |
282 | context.updateRank(1); | |
62d1696a FC |
283 | event = getNextEvent(nextEventContext); |
284 | } | |
285 | ||
54d55ced | 286 | return context; |
62d1696a FC |
287 | } |
288 | ||
146a887c FC |
289 | /* (non-Javadoc) |
290 | * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(int) | |
291 | */ | |
d4011df2 FC |
292 | @Override |
293 | public TmfContext seekEvent(long rank) { | |
62d1696a FC |
294 | |
295 | // Position the stream at the previous checkpoint | |
9f584e4c | 296 | int index = (int) rank / fIndexPageSize; |
452ad365 | 297 | ITmfLocation<?> location; |
e31e01e8 | 298 | synchronized (fCheckpoints) { |
54d55ced FC |
299 | if (fCheckpoints.size() == 0) { |
300 | location = null; | |
301 | } | |
302 | else { | |
e31e01e8 | 303 | if (index >= fCheckpoints.size()) { |
54d55ced | 304 | index = fCheckpoints.size() - 1; |
e31e01e8 FC |
305 | } |
306 | location = fCheckpoints.elementAt(index).getLocation(); | |
307 | } | |
8d2e2848 | 308 | } |
54d55ced | 309 | |
9f584e4c FC |
310 | TmfContext context = seekLocation(location); |
311 | long pos = index * fIndexPageSize; | |
312 | context.setRank(pos); | |
e31e01e8 | 313 | |
9f584e4c | 314 | if (pos < rank) { |
e31e01e8 | 315 | TmfEvent event = getNextEvent(context); |
9f584e4c | 316 | while (event != null && ++pos < rank) { |
e31e01e8 FC |
317 | event = getNextEvent(context); |
318 | } | |
165c977c | 319 | } |
62d1696a | 320 | |
8f50c396 | 321 | return context; |
8c8bf09f ASL |
322 | } |
323 | ||
146a887c FC |
324 | /* (non-Javadoc) |
325 | * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getNextEvent(org.eclipse.linuxtools.tmf.trace.ITmfTrace.TraceContext) | |
326 | */ | |
d4011df2 | 327 | @Override |
9f584e4c | 328 | public synchronized TmfEvent getNextEvent(TmfContext context) { |
e31e01e8 | 329 | // parseEvent() does not update the context |
cc6eec3e | 330 | TmfEvent event = parseEvent(context); |
4e3aa37d | 331 | if (event != null) { |
550d787e | 332 | updateIndex(context, context.getRank(), event.getTimestamp()); |
cb866e08 | 333 | context.setLocation(getCurrentLocation()); |
54d55ced | 334 | context.updateRank(1); |
4e3aa37d FC |
335 | processEvent(event); |
336 | } | |
146a887c FC |
337 | return event; |
338 | } | |
8c8bf09f | 339 | |
cb866e08 FC |
340 | protected synchronized void updateIndex(ITmfContext context, long rank, TmfTimestamp timestamp) { |
341 | if (fStartTime.compareTo(timestamp, false) > 0) fStartTime = timestamp; | |
342 | if (fEndTime.compareTo(timestamp, false) < 0) fEndTime = timestamp; | |
343 | if (context.isValidRank()) { | |
344 | if (fNbEvents <= rank) | |
345 | fNbEvents = rank + 1; | |
346 | // Build the index as we go along | |
347 | if ((rank % fIndexPageSize) == 0) { | |
348 | // Determine the table position | |
349 | long position = rank / fIndexPageSize; | |
350 | // Add new entry at proper location (if empty) | |
351 | if (fCheckpoints.size() == position) { | |
352 | ITmfLocation<?> location = context.getLocation().clone(); | |
1a971e96 | 353 | fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location)); |
cb866e08 FC |
354 | // System.out.println(getName() + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", " + location.toString()); |
355 | } | |
550d787e FC |
356 | } |
357 | } | |
358 | } | |
359 | ||
4e3aa37d | 360 | /** |
e31e01e8 FC |
361 | * Hook for "special" processing by the concrete class |
362 | * (called by getNextEvent()) | |
363 | * | |
146a887c FC |
364 | * @param event |
365 | */ | |
ff4ed569 | 366 | protected void processEvent(TmfEvent event) { |
146a887c | 367 | // Do nothing by default |
62d1696a | 368 | } |
4e3aa37d | 369 | |
e31e01e8 FC |
370 | /** |
371 | * To be implemented by the concrete class | |
4e3aa37d | 372 | */ |
d4011df2 FC |
373 | @Override |
374 | public abstract TmfContext seekLocation(ITmfLocation<?> location); | |
452ad365 | 375 | public abstract ITmfLocation<?> getCurrentLocation(); |
d4011df2 FC |
376 | @Override |
377 | public abstract TmfEvent parseEvent(TmfContext context); | |
4e3aa37d | 378 | |
e31e01e8 FC |
379 | // ------------------------------------------------------------------------ |
380 | // toString | |
381 | // ------------------------------------------------------------------------ | |
8d2e2848 FC |
382 | |
383 | /* (non-Javadoc) | |
384 | * @see java.lang.Object#toString() | |
385 | */ | |
386 | @Override | |
3b38ea61 | 387 | @SuppressWarnings("nls") |
8d2e2848 | 388 | public String toString() { |
ce785d7d | 389 | return "[TmfTrace (" + getName() + ")]"; |
8d2e2848 | 390 | } |
146a887c | 391 | |
664902f7 FC |
392 | // ------------------------------------------------------------------------ |
393 | // Indexing | |
394 | // ------------------------------------------------------------------------ | |
395 | ||
83e13355 FC |
396 | /* |
397 | * The purpose of the index is to keep the information needed to rapidly | |
398 | * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE | |
399 | * event). | |
400 | */ | |
401 | ||
402 | @SuppressWarnings({ "unchecked" }) | |
403 | private void indexTrace(boolean waitForCompletion) { | |
404 | ||
405 | fCheckpoints.clear(); | |
406 | ITmfEventRequest<TmfEvent> request = new TmfEventRequest<TmfEvent>(TmfEvent.class, TmfTimeRange.Eternity, | |
407 | TmfDataRequest.ALL_DATA, 1, ITmfDataRequest.ExecutionType.BACKGROUND) { | |
408 | ||
409 | TmfTimestamp startTime = null; | |
410 | TmfTimestamp lastTime = null; | |
411 | ||
412 | @Override | |
413 | public void handleData(TmfEvent event) { | |
414 | super.handleData(event); | |
415 | if (event != null) { | |
416 | TmfTimestamp ts = event.getTimestamp(); | |
417 | if (startTime == null) | |
418 | startTime = new TmfTimestamp(ts); | |
419 | lastTime = new TmfTimestamp(ts); | |
420 | ||
64267c9d | 421 | if ((getNbRead() % fIndexPageSize) == 0) { |
83e13355 FC |
422 | updateTrace(); |
423 | } | |
424 | } | |
425 | } | |
426 | ||
427 | @Override | |
428 | public void handleSuccess() { | |
429 | updateTrace(); | |
430 | } | |
431 | ||
432 | private void updateTrace() { | |
433 | int nbRead = getNbRead(); | |
434 | if (nbRead != 0) { | |
435 | fStartTime = startTime; | |
436 | fEndTime = lastTime; | |
437 | fNbEvents = nbRead; | |
438 | notifyListeners(); | |
439 | } | |
440 | } | |
441 | }; | |
442 | ||
443 | sendRequest((ITmfDataRequest<T>) request); | |
444 | if (waitForCompletion) | |
445 | try { | |
446 | request.waitForCompletion(); | |
447 | } catch (InterruptedException e) { | |
448 | e.printStackTrace(); | |
449 | } | |
450 | } | |
451 | ||
83e13355 FC |
452 | protected void notifyListeners() { |
453 | broadcast(new TmfTraceUpdatedSignal(this, this, new TmfTimeRange(fStartTime, fEndTime))); | |
454 | } | |
455 | ||
456 | @TmfSignalHandler | |
457 | public void handleTraceOpen(TmfTraceOpenedSignal signal) { | |
458 | ITmfTrace trace = signal.getTrace(); | |
459 | if (trace == this) { | |
460 | indexTrace(false); | |
461 | } | |
462 | } | |
664902f7 | 463 | |
abfad0aa FC |
464 | // ------------------------------------------------------------------------ |
465 | // TmfDataProvider | |
466 | // ------------------------------------------------------------------------ | |
467 | ||
83e13355 | 468 | @Override |
64267c9d FC |
469 | protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) { |
470 | ||
471 | // TODO: Handle the data requests also... | |
472 | if (!(request instanceof ITmfEventRequest<?>)) { | |
473 | super.queueRequest(request); | |
474 | return; | |
475 | } | |
476 | final ITmfEventRequest<T> eventRequest = (ITmfEventRequest<T>) request; | |
477 | ||
478 | Thread thread = new Thread() { | |
479 | @Override | |
480 | public void run() { | |
481 | ||
482 | // final long requestStart = System.nanoTime(); | |
483 | ||
484 | final Integer[] CHUNK_SIZE = new Integer[1]; | |
485 | CHUNK_SIZE[0] = blockSize + ((indexing) ? 1 : 0); | |
486 | ||
487 | final Integer[] nbRead = new Integer[1]; | |
488 | nbRead[0] = 0; | |
489 | ||
490 | // final TmfTimestamp[] timestamp = new TmfTimestamp[1]; | |
491 | // timestamp[0] = new TmfTimestamp(eventRequest.getRange().getStartTime()); | |
492 | // final TmfTimestamp endTS = eventRequest.getRange().getEndTime(); | |
493 | ||
494 | final Boolean[] isFinished = new Boolean[1]; | |
495 | isFinished[0] = Boolean.FALSE; | |
496 | ||
497 | while (!isFinished[0]) { | |
498 | ||
499 | // TmfEventRequest<T> subRequest = new TmfEventRequest<T>(eventRequest.getDataType(), new TmfTimeRange(timestamp[0], endTS), CHUNK_SIZE[0], eventRequest.getBlockize(), ExecutionType.BACKGROUND) | |
500 | // TmfDataRequest<T> subRequest = new TmfDataRequest<T>(eventRequest.getDataType(), nbRead[0], CHUNK_SIZE[0], eventRequest.getBlockize(), ExecutionType.BACKGROUND) | |
501 | TmfDataRequest<T> subRequest = new TmfDataRequest<T>(eventRequest.getDataType(), nbRead[0], CHUNK_SIZE[0], ExecutionType.BACKGROUND) | |
502 | { | |
503 | @Override | |
504 | public void handleData(T data) { | |
505 | super.handleData(data); | |
506 | eventRequest.handleData(data); | |
507 | if (getNbRead() == CHUNK_SIZE[0]) { | |
508 | nbRead[0] += getNbRead(); | |
509 | } | |
510 | if (getNbRead() > CHUNK_SIZE[0]) { | |
511 | System.out.println("ERROR - Read too many events"); //$NON-NLS-1$ | |
512 | } | |
513 | } | |
514 | ||
515 | @Override | |
516 | public void handleCompleted() { | |
517 | // System.out.println("Request completed at: " + timestamp[0]); | |
518 | if (getNbRead() < CHUNK_SIZE[0]) { | |
519 | if (isCancelled()) { | |
520 | eventRequest.cancel(); | |
521 | } | |
522 | else { | |
523 | eventRequest.done(); | |
524 | } | |
525 | isFinished[0] = Boolean.TRUE; | |
526 | nbRead[0] += getNbRead(); | |
527 | // System.out.println("fNbRead=" + getNbRead() + " total=" + nbRead[0]); | |
528 | } | |
529 | super.handleCompleted(); | |
530 | } | |
531 | }; | |
532 | ||
533 | if (!isFinished[0]) { | |
534 | queueRequest(subRequest); | |
535 | ||
536 | try { | |
537 | subRequest.waitForCompletion(); | |
538 | // System.out.println("Finished at " + timestamp[0]); | |
539 | } catch (InterruptedException e) { | |
540 | e.printStackTrace(); | |
541 | } | |
542 | ||
543 | // TmfTimestamp newTS = new TmfTimestamp(timestamp[0].getValue() + 1, timestamp[0].getScale(), timestamp[0].getPrecision()); | |
544 | // timestamp[0] = newTS; | |
545 | CHUNK_SIZE[0] = blockSize; | |
546 | // System.out.println("New timestamp: " + timestamp[0]); | |
547 | } | |
548 | } | |
549 | // final long requestEnded = System.nanoTime(); | |
550 | // System.out.println("Background request completed. Elapsed= " + (requestEnded * 1.0 - requestStart) / 1000000000); | |
551 | } | |
552 | }; | |
553 | ||
554 | thread.start(); | |
555 | } | |
556 | ||
557 | // @Override | |
558 | // protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean adjust) { | |
559 | // super.queueBackgroundRequest(request, fIndexPageSize, true); | |
560 | // } | |
f6b14ce2 | 561 | |
8c8bf09f | 562 | } |