import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.linuxtools.tmf.Tracer;
final protected Class<T> fType;
final protected boolean fLogData;
- final protected boolean fLogException;
+ final protected boolean fLogError;
public static final int DEFAULT_QUEUE_SIZE = 1000;
protected final int fQueueSize;
super(name);
fType = type;
fQueueSize = queueSize;
- fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
-
- if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "created");
+// fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
+ fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
fExecutor = new TmfRequestExecutor();
fSignalDepth = 0;
- fLogData = Tracer.isEventTraced();
- fLogException = Tracer.isEventTraced();
+ fLogData = Tracer.isEventTraced();
+ fLogError = Tracer.isErrorTraced();
TmfProviderManager.register(fType, this);
if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
super(other);
fType = other.fType;
fQueueSize = other.fQueueSize;
- fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
+// fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
+ fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
fExecutor = new TmfRequestExecutor();
fSignalDepth = 0;
- fLogData = Tracer.isEventTraced();
- fLogException = Tracer.isEventTraced();
+ fLogData = Tracer.isEventTraced();
+ fLogError = Tracer.isErrorTraced();
}
@Override
protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
synchronized(this) {
TmfCoalescedDataRequest<T> coalescedRequest =
- new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
+ new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
coalescedRequest.addRequest(request);
fPendingCoalescedRequests.add(coalescedRequest);
}
protected void queueRequest(final ITmfDataRequest<T> request) {
- final ITmfDataProvider<T> provider = this;
- final ITmfComponent component = this;
+ final ITmfDataProvider<T> provider = this;
+ final ITmfComponent component = this;
// Process the request
Thread thread = new Thread() {
request.done();
}
catch (Exception e) {
- e.printStackTrace();
- if (fLogException) Tracer.traceException(e);
request.fail();
+// e.printStackTrace();
}
}
};
* @return
*/
private static final int TIMEOUT = 1000;
+// public abstract T getNext(ITmfContext context) throws InterruptedException;
+// private int getLevel = 0;
public T getNext(ITmfContext context) throws InterruptedException {
- T event = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
- if (event == null) {
- if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on read");
- System.out.println(getName() + ": Request timeout on read");
+// String name = Thread.currentThread().getName(); getLevel++;
+// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
+ T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ if (data == null) {
+ if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
throw new InterruptedException();
}
- return event;
+// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
+// getLevel--;
+ return data;
}
/**
*
* @param data
*/
+// public abstract void queueResult(T data) throws InterruptedException;
+// private int putLevel = 0;
public void queueResult(T data) throws InterruptedException {
+// String name = Thread.currentThread().getName(); putLevel++;
+// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
if (!ok) {
- if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on write");
- System.out.println(getName() + ": Request timeout on write");
+ if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
throw new InterruptedException();
}
+// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
+// putLevel--;
}
/**