Bug 378402: Implementation of ControlFlow view and Resources view for
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
4918b8f2 20import org.eclipse.linuxtools.internal.tmf.core.Tracer;
8fd82db5
FC
21import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 25import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 26import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 27import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b 28import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
6c13869b
FC
29import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
33
34/**
8fd82db5 35 * An abstract base class that implements ITmfDataProvider.
8c8bf09f 36 * <p>
8fd82db5
FC
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
8c8bf09f 39 * <p>
8fd82db5
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
8fd82db5
FC
44 *
45 * @version 1.0
46 * @author Francois Chouinard
8c8bf09f 47 */
72f1e62a 48public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 49
948b0607
FC
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
550d787e 53
00641a97
FC
54 public static final int DEFAULT_BLOCK_SIZE = 50000;
55 public static final int DEFAULT_QUEUE_SIZE = 1000;
56
948b0607 57 // ------------------------------------------------------------------------
12c155f5 58 // Attributes
948b0607 59 // ------------------------------------------------------------------------
8c8bf09f 60
12c155f5
FC
61 protected Class<T> fType;
62 protected boolean fLogData;
63 protected boolean fLogError;
f6b14ce2 64
00641a97 65 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
12c155f5
FC
66 protected BlockingQueue<T> fDataQueue;
67 protected TmfRequestExecutor fExecutor;
948b0607
FC
68
69 private int fSignalDepth = 0;
045df77d 70 private final Object fLock = new Object();
951d134a 71
c1c69938
FC
72 private int fRequestPendingCounter = 0;
73
948b0607
FC
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
12c155f5 78 public TmfDataProvider() {
00641a97 79 super();
12c155f5
FC
80 fQueueSize = DEFAULT_QUEUE_SIZE;
81 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
82 fExecutor = new TmfRequestExecutor();
83 }
84
3791b5df 85 public void init(String name, Class<T> type) {
12c155f5 86 super.init(name);
3791b5df 87 fType = type;
12c155f5
FC
88 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
89
90 fExecutor = new TmfRequestExecutor();
91 fSignalDepth = 0;
92
93 fLogData = Tracer.isEventTraced();
94 fLogError = Tracer.isErrorTraced();
95
96 TmfProviderManager.register(fType, this);
97 }
98
948b0607 99 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
00641a97 100 this();
948b0607 101 fQueueSize = queueSize;
00641a97 102 init(name, type);
948b0607
FC
103 }
104
105 public TmfDataProvider(TmfDataProvider<T> other) {
00641a97
FC
106 this();
107 init(other.getName(), other.fType);
108 }
550d787e 109
00641a97
FC
110 public TmfDataProvider(String name, Class<T> type) {
111 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
112 }
113
114 @Override
115 public void dispose() {
116 TmfProviderManager.deregister(fType, this);
117 fExecutor.stop();
118 super.dispose();
12c155f5 119 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
120 }
121
00641a97
FC
122 // ------------------------------------------------------------------------
123 // Accessors
124 // ------------------------------------------------------------------------
125
948b0607
FC
126 public int getQueueSize() {
127 return fQueueSize;
128 }
129
130 public Class<?> getType() {
131 return fType;
132 }
133
134 // ------------------------------------------------------------------------
135 // ITmfRequestHandler
136 // ------------------------------------------------------------------------
137
138 @Override
139 public void sendRequest(final ITmfDataRequest<T> request) {
140 synchronized (fLock) {
141 if (fSignalDepth > 0) {
142 coalesceDataRequest(request);
143 } else {
144 dispatchRequest(request);
145 }
146 }
147 }
148
149 /**
150 * This method queues the coalesced requests.
948b0607
FC
151 */
152 @Override
c1c69938 153 public void fireRequest() {
948b0607
FC
154 synchronized (fLock) {
155 if (fRequestPendingCounter > 0) {
156 return;
157 }
158 if (fPendingCoalescedRequests.size() > 0) {
159 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
160 dispatchRequest(request);
161 }
162 fPendingCoalescedRequests.clear();
163 }
164 }
165 }
166
167 /**
12c155f5
FC
168 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
169 * coalescing requests accross multiple TmfDataProvider.
948b0607
FC
170 *
171 * @param isIncrement
172 */
c1c69938
FC
173 @Override
174 public void notifyPendingRequest(boolean isIncrement) {
948b0607 175 synchronized (fLock) {
c1c69938
FC
176 if (isIncrement) {
177 if (fSignalDepth > 0) {
178 fRequestPendingCounter++;
179 }
180 } else {
181 if (fRequestPendingCounter > 0) {
182 fRequestPendingCounter--;
183 }
948b0607 184
c1c69938
FC
185 // fire request if all pending requests are received
186 if (fRequestPendingCounter == 0) {
187 fireRequest();
188 }
189 }
190 }
948b0607
FC
191 }
192
193 // ------------------------------------------------------------------------
194 // Coalescing (primitive test...)
195 // ------------------------------------------------------------------------
196
197 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
198
199 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
200 synchronized (fLock) {
1b70b6dc 201 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
12c155f5 202 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607
FC
203 coalescedRequest.addRequest(request);
204 if (Tracer.isRequestTraced()) {
90891c08
FC
205 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
206 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
207 }
208 fPendingCoalescedRequests.add(coalescedRequest);
209 }
210 }
211
212 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
213 synchronized (fLock) {
214 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
215 if (coalescedRequest.isCompatible(request)) {
216 coalescedRequest.addRequest(request);
217 if (Tracer.isRequestTraced()) {
90891c08
FC
218 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
219 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
220 }
221 return;
222 }
223 }
224 newCoalescedDataRequest(request);
225 }
226 }
227
228 // ------------------------------------------------------------------------
229 // Request processing
230 // ------------------------------------------------------------------------
231
232 private void dispatchRequest(final ITmfDataRequest<T> request) {
233 if (request.getExecType() == ExecutionType.FOREGROUND)
234 queueRequest(request);
235 else
236 queueBackgroundRequest(request, request.getBlockSize(), true);
237 }
238
239 protected void queueRequest(final ITmfDataRequest<T> request) {
240
241 if (fExecutor.isShutdown()) {
242 request.cancel();
243 return;
244 }
245
246 final TmfDataProvider<T> provider = this;
247
248 // Process the request
249 TmfThread thread = new TmfThread(request.getExecType()) {
475743b7 250
948b0607
FC
251 @Override
252 public void run() {
253
4cf201de 254 if (Tracer.isRequestTraced()) {
90891c08 255 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de 256 }
948b0607
FC
257
258 // Extract the generic information
259 request.start();
260 int nbRequested = request.getNbRequested();
261 int nbRead = 0;
262
263 // Initialize the execution
264 ITmfContext context = armRequest(request);
265 if (context == null) {
266 request.cancel();
267 return;
268 }
269
270 try {
271 // Get the ordered events
272 T data = getNext(context);
273 if (Tracer.isRequestTraced())
90891c08 274 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
948b0607 275 while (data != null && !isCompleted(request, data, nbRead)) {
4cf201de
FC
276// if (fLogData)
277// Tracer.traceEvent(provider, request, data);
1b70b6dc
PT
278 if (request.getDataType().isInstance(data)) {
279 request.handleData(data);
280 }
948b0607
FC
281
282 // To avoid an unnecessary read passed the last data
283 // requested
284 if (++nbRead < nbRequested) {
285 data = getNext(context);
286 }
287 }
288 if (Tracer.isRequestTraced())
90891c08 289 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
948b0607
FC
290
291 if (request.isCancelled()) {
292 request.cancel();
293 } else {
294 request.done();
295 }
296 } catch (Exception e) {
297 request.fail();
298 }
299
300 // Cleanup
301 context.dispose();
302 }
475743b7
FC
303
304 @Override
305 public void cancel() {
8a0edc79
FC
306 if (!request.isCompleted()) {
307 request.cancel();
475743b7
FC
308 }
309 }
948b0607
FC
310 };
311
312 if (Tracer.isRequestTraced())
90891c08 313 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
948b0607
FC
314 fExecutor.execute(thread);
315
316 }
317
318 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
319
4cf201de
FC
320 final TmfDataProvider<T> provider = this;
321
948b0607
FC
322 Thread thread = new Thread() {
323 @Override
324 public void run() {
4cf201de
FC
325
326 if (Tracer.isRequestTraced()) {
327 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
328 }
329
948b0607
FC
330 request.start();
331
332 final Integer[] CHUNK_SIZE = new Integer[1];
333 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
334
335 final Integer[] nbRead = new Integer[1];
336 nbRead[0] = 0;
337
338 final Boolean[] isFinished = new Boolean[1];
339 isFinished[0] = Boolean.FALSE;
340
341 while (!isFinished[0]) {
342
12c155f5
FC
343 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
344 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
948b0607
FC
345 @Override
346 public void handleData(T data) {
347 super.handleData(data);
1b70b6dc
PT
348 if (request.getDataType().isInstance(data)) {
349 request.handleData(data);
350 }
948b0607
FC
351 if (getNbRead() > CHUNK_SIZE[0]) {
352 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
353 }
354 }
355
356 @Override
357 public void handleCompleted() {
358 nbRead[0] += getNbRead();
359 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 360 if (this.isCancelled()) {
948b0607 361 request.cancel();
90de83da 362 } else if (this.isFailed()) {
12c155f5 363 request.fail();
948b0607
FC
364 } else {
365 request.done();
366 }
367 isFinished[0] = Boolean.TRUE;
368 }
369 super.handleCompleted();
370 }
371 };
372
373 if (!isFinished[0]) {
374 queueRequest(subRequest);
375
376 try {
377 subRequest.waitForCompletion();
378 } catch (InterruptedException e) {
379 e.printStackTrace();
380 }
381
382 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
383 }
384 }
385 }
386 };
387
388 thread.start();
389 }
390
391 /**
d337369a
FC
392 * Initialize the provider based on the request. The context is provider
393 * specific and will be updated by getNext().
948b0607
FC
394 *
395 * @param request
12c155f5 396 * @return an application specific context; null if request can't be serviced
948b0607 397 */
9e0640dc 398 protected abstract ITmfContext armRequest(ITmfDataRequest<T> request);
948b0607 399
c32744d6
FC
400// /**
401// * Return the next event based on the context supplied. The context
402// * will be updated for the subsequent read.
403// *
404// * @param context the trace read context (updated)
405// * @return the event referred to by context
406// */
407// public abstract T getNext(ITmfContext context);
948b0607
FC
408
409 /**
410 * Checks if the data meets the request completion criteria.
411 *
0d9a6d76
FC
412 * @param request the request
413 * @param data the data to verify
414 * @param nbRead the number of events read so far
415 * @return true if completion criteria is met
948b0607
FC
416 */
417 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
b6be1c3e 418 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
419 }
420
421 // ------------------------------------------------------------------------
422 // Signal handlers
423 // ------------------------------------------------------------------------
424
425 @TmfSignalHandler
426 public void startSynch(TmfStartSynchSignal signal) {
427 synchronized (fLock) {
428 fSignalDepth++;
429 }
430 }
431
432 @TmfSignalHandler
433 public void endSynch(TmfEndSynchSignal signal) {
045df77d 434 synchronized (fLock) {
948b0607
FC
435 fSignalDepth--;
436 if (fSignalDepth == 0) {
437 fireRequest();
438 }
045df77d 439 }
948b0607 440 }
8c8bf09f
ASL
441
442}
This page took 0.068965 seconds and 5 git commands to generate.