2010-06-21 fchouinard@gmail.com Fix for Bug316276
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
9b635e61 19import java.util.concurrent.SynchronousQueue;
550d787e 20import java.util.concurrent.TimeUnit;
8c8bf09f 21
ce785d7d 22import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 23import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a 24import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
9b635e61 25import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
951d134a 26import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
27import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
28import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
29import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.trace.ITmfContext;
33
34/**
35 * <b><u>TmfProvider</u></b>
36 * <p>
37 * The TmfProvider<T> is a provider for a data of type <T>.
38 * <p>
39 * This abstract class implements the housekeeking methods to register/
40 * deregister the event provider and to handle generically the event requests.
41 * <p>
42 * The concrete class can either re-implement processRequest() entirely or
43 * just implement the hooks (initializeContext() and getNext()).
44 * <p>
45 * TODO: Add support for providing multiple data types.
46 */
951d134a 47public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 48
550d787e
FC
49 // ------------------------------------------------------------------------
50 // Constants
51 // ------------------------------------------------------------------------
52
9b635e61 53// private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
550d787e
FC
54// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
55
56 // ------------------------------------------------------------------------
57 //
58 // ------------------------------------------------------------------------
59
8c8bf09f 60 final protected Class<T> fType;
550d787e 61 final protected boolean fLogData;
cb866e08 62 final protected boolean fLogError;
8c8bf09f
ASL
63
64 public static final int DEFAULT_QUEUE_SIZE = 1000;
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
550d787e 69 private int fSignalDepth = 0;
951d134a 70
8c8bf09f 71 // ------------------------------------------------------------------------
951d134a 72 // Constructors
8c8bf09f
ASL
73 // ------------------------------------------------------------------------
74
75 public TmfDataProvider(String name, Class<T> type) {
76 this(name, type, DEFAULT_QUEUE_SIZE);
77 }
78
79 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
80 super(name);
fc6ccf6f 81 fType = type;
ce785d7d 82 fQueueSize = queueSize;
9b635e61
FC
83 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
84// fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
ce785d7d
FC
85
86 fExecutor = new TmfRequestExecutor();
550d787e
FC
87 fSignalDepth = 0;
88
cb866e08
FC
89 fLogData = Tracer.isEventTraced();
90 fLogError = Tracer.isErrorTraced();
54d55ced
FC
91
92 TmfProviderManager.register(fType, this);
9b635e61 93// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 94}
377f1ad8 95
ce785d7d
FC
96 public TmfDataProvider(TmfDataProvider<T> other) {
97 super(other);
98 fType = other.fType;
99 fQueueSize = other.fQueueSize;
9b635e61
FC
100 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
101// fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
ce785d7d
FC
102
103 fExecutor = new TmfRequestExecutor();
550d787e
FC
104 fSignalDepth = 0;
105
cb866e08
FC
106 fLogData = Tracer.isEventTraced();
107 fLogError = Tracer.isErrorTraced();
377f1ad8
WB
108 }
109
fc6ccf6f 110 @Override
2fb2eb37 111 public void dispose() {
8c8bf09f 112 TmfProviderManager.deregister(fType, this);
54d55ced 113 fExecutor.stop();
550d787e 114
9b635e61 115// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
550d787e 116
2fb2eb37 117 super.dispose();
8c8bf09f
ASL
118 }
119
120 public int getQueueSize() {
121 return fQueueSize;
122 }
123
ff4ed569
FC
124 public Class<?> getType() {
125 return fType;
126 }
127
8c8bf09f
ASL
128 // ------------------------------------------------------------------------
129 // ITmfRequestHandler
130 // ------------------------------------------------------------------------
131
550d787e
FC
132 public void sendRequest(final ITmfDataRequest<T> request) {
133 synchronized(this) {
9b635e61
FC
134 if (fSignalDepth > 0) {
135 coalesceDataRequest(request);
136 } else {
137 dispatchRequest(request);
550d787e 138 }
951d134a
FC
139 }
140 }
141
142 /**
143 * This method queues the coalesced requests.
144 *
145 * @param thread
146 */
550d787e
FC
147 public void fireRequests() {
148 synchronized(this) {
149 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
9b635e61 150 dispatchRequest(request);
550d787e
FC
151 }
152 fPendingCoalescedRequests.clear();
951d134a 153 }
951d134a
FC
154 }
155
156 // ------------------------------------------------------------------------
157 // Coalescing (primitive test...)
158 // ------------------------------------------------------------------------
8c8bf09f 159
951d134a 160 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 161
550d787e
FC
162 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
163 synchronized(this) {
164 TmfCoalescedDataRequest<T> coalescedRequest =
cb866e08 165 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
550d787e 166 coalescedRequest.addRequest(request);
9b635e61
FC
167 if (Tracer.isRequestTraced()) {
168 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
169 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
170 }
550d787e
FC
171 fPendingCoalescedRequests.add(coalescedRequest);
172 }
951d134a 173 }
8c8bf09f 174
2fb2eb37 175 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
550d787e
FC
176 synchronized(this) {
177 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
178 if (req.isCompatible(request)) {
179 req.addRequest(request);
9b635e61
FC
180 if (Tracer.isRequestTraced()) {
181 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
182 Tracer.traceRequest(req, "added " + request.getRequestId());
183 }
550d787e
FC
184 return;
185 }
951d134a 186 }
550d787e 187 newCoalescedDataRequest(request);
8c8bf09f 188 }
8c8bf09f
ASL
189 }
190
951d134a
FC
191 // ------------------------------------------------------------------------
192 // Request processing
193 // ------------------------------------------------------------------------
194
9b635e61
FC
195 private void dispatchRequest(final ITmfDataRequest<T> request) {
196 if (request.getExecType() == ExecutionType.SHORT)
197 queueRequest(request);
198 else
199 queueLongRequest(request);
200 }
201
2fb2eb37 202 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 203
9b635e61
FC
204// final ITmfDataProvider<T> provider = this;
205// final ITmfComponent component = this;
550d787e 206
8c8bf09f 207 // Process the request
9b635e61 208 TmfThread thread = new TmfThread(request.getExecType()) {
8c8bf09f
ASL
209
210 @Override
211 public void run() {
212
9b635e61
FC
213// /////
214// String message = (System.currentTimeMillis() + ": Req=" + request.getRequestId() +
215// (request.getExecType() == ITmfDataRequest.ExecutionType.LONG ? "(long)" : "(short)") +
216// ", Type=" + request.getClass().getName() +
217// ", DataType=" + request.getDataType().getSimpleName() + " " + "started");
218// System.out.println(message);
219// ////
220
221// if (request.getExecType() == ExecutionType.LONG) {
222// setPriority(Thread.MIN_PRIORITY);
223// } else {
224// setPriority(Thread.MAX_PRIORITY);
225// }
226// yield();
227
8c8bf09f 228 // Extract the generic information
550d787e 229 request.start();
8c8bf09f
ASL
230 int blockSize = request.getBlockize();
231 int nbRequested = request.getNbRequested();
232
233 // Create the result buffer
234 Vector<T> result = new Vector<T>();
235 int nbRead = 0;
236
237 // Initialize the execution
238 ITmfContext context = armRequest(request);
239 if (context == null) {
550d787e 240 request.cancel();
8c8bf09f
ASL
241 return;
242 }
243
550d787e
FC
244 try {
245 // Get the ordered events
9b635e61 246// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
550d787e 247 T data = getNext(context);
9b635e61 248// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
550d787e
FC
249 while (data != null && !isCompleted(request, data, nbRead))
250 {
9b635e61 251// if (fLogData) Tracer.traceEvent(provider, request, data);
550d787e
FC
252 result.add(data);
253 if (++nbRead % blockSize == 0) {
254 pushData(request, result);
ce785d7d 255 }
550d787e
FC
256 // To avoid an unnecessary read passed the last data requested
257 if (nbRead < nbRequested) {
258 data = getNext(context);
9b635e61
FC
259// if (data == null || data.isNullRef()) {
260// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
261// }
550d787e
FC
262 }
263 }
264 if (result.size() > 0) {
265 pushData(request, result);
7f407ead 266 }
550d787e 267 request.done();
9b635e61
FC
268
269// ////
270// message = (System.currentTimeMillis() + ": Req=" + request.getRequestId() +
271// (request.getExecType() == ITmfDataRequest.ExecutionType.LONG ? "(long)" : "(short)") +
272// ", Type=" + request.getClass().getName() +
273// ", DataType=" + request.getDataType().getSimpleName() + " " + "completed");
274// System.out.println(message);
275// ////
550d787e
FC
276 }
277 catch (Exception e) {
9b635e61 278 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
550d787e 279 request.fail();
9b635e61 280 return;
cb866e08 281// e.printStackTrace();
8c8bf09f 282 }
8c8bf09f
ASL
283 }
284 };
9b635e61
FC
285// /////
286// String message = (System.currentTimeMillis() + ": Req=" + request.getRequestId() +
287// (request.getExecType() == ITmfDataRequest.ExecutionType.LONG ? "(long)" : "(short)") +
288// ", Type=" + request.getClass().getName() +
289// ", DataType=" + request.getDataType().getSimpleName() + " " + "queued");
290// System.out.println(message);
291// ////
5c00c0b7 292 fExecutor.execute(thread);
9b635e61 293
550d787e 294 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
295 }
296
9b635e61
FC
297 // By default, same behavior as a short request
298 protected void queueLongRequest(final ITmfDataRequest<T> request) {
299 queueRequest(request);
300 }
301
8c8bf09f
ASL
302 /**
303 * Format the result data and forwards it to the requester.
304 * Note: after handling, the data is *removed*.
305 *
306 * @param request
307 * @param data
308 */
309 @SuppressWarnings("unchecked")
951d134a 310 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
311 synchronized(request) {
312 if (!request.isCompleted()) {
313 T[] result = (T[]) Array.newInstance(fType, data.size());
314 data.toArray(result);
315 request.setData(result);
316 request.handleData();
317 data.removeAllElements();
318 }
319 }
320 }
321
951d134a
FC
322 /**
323 * Initialize the provider based on the request. The context is
324 * provider specific and will be updated by getNext().
325 *
326 * @param request
327 * @return an application specific context; null if request can't be serviced
328 */
2fb2eb37 329 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
951d134a 330
8c8bf09f
ASL
331 /**
332 * Return the next piece of data based on the context supplied. The context
333 * would typically be updated for the subsequent read.
334 *
335 * @param context
336 * @return
337 */
9b635e61 338 private static final int TIMEOUT = 10000;
cb866e08
FC
339// public abstract T getNext(ITmfContext context) throws InterruptedException;
340// private int getLevel = 0;
550d787e 341 public T getNext(ITmfContext context) throws InterruptedException {
cb866e08
FC
342// String name = Thread.currentThread().getName(); getLevel++;
343// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
344 T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
345 if (data == null) {
9b635e61 346// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
550d787e 347 throw new InterruptedException();
8c8bf09f 348 }
cb866e08
FC
349// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
350// getLevel--;
351 return data;
8c8bf09f
ASL
352 }
353
951d134a
FC
354 /**
355 * Makes the generated result data available for getNext()
356 *
357 * @param data
358 */
cb866e08
FC
359// public abstract void queueResult(T data) throws InterruptedException;
360// private int putLevel = 0;
550d787e 361 public void queueResult(T data) throws InterruptedException {
cb866e08
FC
362// String name = Thread.currentThread().getName(); putLevel++;
363// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
550d787e
FC
364 boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
365 if (!ok) {
9b635e61 366// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
550d787e 367 throw new InterruptedException();
8c8bf09f 368 }
cb866e08
FC
369// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
370// putLevel--;
8c8bf09f
ASL
371 }
372
373 /**
374 * Checks if the data meets the request completion criteria.
375 *
376 * @param request
377 * @param data
378 * @return
379 */
2fb2eb37 380 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 381 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
382 }
383
951d134a
FC
384 // ------------------------------------------------------------------------
385 // Signal handlers
386 // ------------------------------------------------------------------------
387
8c8bf09f 388 @TmfSignalHandler
550d787e
FC
389 public synchronized void startSynch(TmfStartSynchSignal signal) {
390 fSignalDepth++;
8c8bf09f
ASL
391 }
392
393 @TmfSignalHandler
550d787e
FC
394 public synchronized void endSynch(TmfEndSynchSignal signal) {
395 fSignalDepth--;
396 if (fSignalDepth == 0) {
397 fireRequests();
951d134a 398 }
8c8bf09f
ASL
399 }
400
401}
This page took 0.049289 seconds and 5 git commands to generate.