1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.component
;
15 import java
.lang
.reflect
.Array
;
16 import java
.util
.Vector
;
17 import java
.util
.concurrent
.BlockingQueue
;
18 import java
.util
.concurrent
.LinkedBlockingQueue
;
19 import java
.util
.concurrent
.SynchronousQueue
;
21 import org
.eclipse
.linuxtools
.tmf
.Tracer
;
22 import org
.eclipse
.linuxtools
.tmf
.event
.TmfData
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.TmfCoalescedDataRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.TmfRequestExecutor
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
30 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
33 * <b><u>TmfProvider</u></b>
35 * The TmfProvider<T> is a provider for a data of type <T>.
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
43 * TODO: Add support for providing multiple data types.
45 public abstract class TmfDataProvider
<T
extends TmfData
> extends TmfComponent
implements ITmfDataProvider
<T
> {
47 final protected Class
<T
> fType
;
49 public static final int DEFAULT_QUEUE_SIZE
= 1000;
50 protected final int fQueueSize
;
51 protected final BlockingQueue
<T
> fDataQueue
;
52 protected final TmfRequestExecutor fExecutor
;
54 private int fCoalescingLevel
= 0;
56 // ------------------------------------------------------------------------
58 // ------------------------------------------------------------------------
60 public TmfDataProvider(String name
, Class
<T
> type
) {
61 this(name
, type
, DEFAULT_QUEUE_SIZE
);
64 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
67 fQueueSize
= queueSize
;
68 fDataQueue
= (queueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
70 Tracer
.trace(getName() + " created");
72 fExecutor
= new TmfRequestExecutor();
75 TmfProviderManager
.register(fType
, this);
76 Tracer
.trace(getName() + " started");
79 public TmfDataProvider(TmfDataProvider
<T
> other
) {
82 fQueueSize
= other
.fQueueSize
;
83 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
85 fExecutor
= new TmfRequestExecutor();
90 public void dispose() {
91 TmfProviderManager
.deregister(fType
, this);
93 Tracer
.trace(getName() + " stopped");
97 public int getQueueSize() {
101 public Class
<?
> getType() {
105 // ------------------------------------------------------------------------
106 // ITmfRequestHandler
107 // ------------------------------------------------------------------------
109 public synchronized void sendRequest(final ITmfDataRequest
<T
> request
) {
111 if (fCoalescingLevel
> 0) {
112 // We are in coalescing mode: client should NEVER wait
113 // (otherwise we will have deadlock...)
114 coalesceDataRequest(request
);
116 // Process the request immediately
117 queueRequest(request
);
122 * This method queues the coalesced requests.
126 private synchronized void fireRequests() {
127 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
128 queueRequest(request
);
130 fPendingCoalescedRequests
.clear();
133 // ------------------------------------------------------------------------
134 // Coalescing (primitive test...)
135 // ------------------------------------------------------------------------
137 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
139 protected synchronized void newCoalescedDataRequest(ITmfDataRequest
<T
> request
) {
140 TmfCoalescedDataRequest
<T
> coalescedRequest
=
141 new TmfCoalescedDataRequest
<T
>(fType
, request
.getIndex(), request
.getNbRequested(), request
.getBlockize());
142 coalescedRequest
.addRequest(request
);
143 fPendingCoalescedRequests
.add(coalescedRequest
);
146 protected synchronized void coalesceDataRequest(ITmfDataRequest
<T
> request
) {
147 for (TmfCoalescedDataRequest
<T
> req
: fPendingCoalescedRequests
) {
148 if (req
.isCompatible(request
)) {
149 req
.addRequest(request
);
153 newCoalescedDataRequest(request
);
156 // ------------------------------------------------------------------------
157 // Request processing
158 // ------------------------------------------------------------------------
160 protected void queueRequest(final ITmfDataRequest
<T
> request
) {
162 final String provider
= getName();
164 // Process the request
165 Thread thread
= new Thread() {
170 // Extract the generic information
171 int blockSize
= request
.getBlockize();
172 int nbRequested
= request
.getNbRequested();
174 // Create the result buffer
175 Vector
<T
> result
= new Vector
<T
>();
178 // Initialize the execution
179 ITmfContext context
= armRequest(request
);
180 if (context
== null) {
185 // Get the ordered events
186 Tracer
.trace("Request #" + request
.getRequestId() + " is serviced by " + provider
);
187 T data
= getNext(context
);
188 Tracer
.trace("Request #" + request
.getRequestId() + " read first event");
189 while (data
!= null && !isCompleted(request
, data
, nbRead
))
192 if (++nbRead
% blockSize
== 0) {
193 pushData(request
, result
);
195 // To avoid an unnecessary read passed the last data requested
196 if (nbRead
< nbRequested
) {
197 data
= getNext(context
);
198 if (data
== null || data
.isNullRef()) {
199 Tracer
.trace("Request #" + request
.getRequestId() + " end of data");
203 pushData(request
, result
);
207 fExecutor
.execute(thread
);
211 * Format the result data and forwards it to the requester.
212 * Note: after handling, the data is *removed*.
217 @SuppressWarnings("unchecked")
218 protected void pushData(ITmfDataRequest
<T
> request
, Vector
<T
> data
) {
219 synchronized(request
) {
220 if (!request
.isCompleted()) {
221 T
[] result
= (T
[]) Array
.newInstance(fType
, data
.size());
222 data
.toArray(result
);
223 request
.setData(result
);
224 request
.handleData();
225 data
.removeAllElements();
231 * Initialize the provider based on the request. The context is
232 * provider specific and will be updated by getNext().
235 * @return an application specific context; null if request can't be serviced
237 public abstract ITmfContext
armRequest(ITmfDataRequest
<T
> request
);
240 * Return the next piece of data based on the context supplied. The context
241 * would typically be updated for the subsequent read.
246 public T
getNext(ITmfContext context
) {
248 T event
= fDataQueue
.take();
250 } catch (InterruptedException e
) {
257 * Makes the generated result data available for getNext()
261 public void queueResult(T data
) {
263 fDataQueue
.put(data
);
264 } catch (InterruptedException e1
) {
265 e1
.printStackTrace();
270 * Checks if the data meets the request completion criteria.
276 public boolean isCompleted(ITmfDataRequest
<T
> request
, T data
, int nbRead
) {
277 return request
.isCompleted() || nbRead
>= request
.getNbRequested() || data
.isNullRef();
280 // ------------------------------------------------------------------------
282 // ------------------------------------------------------------------------
285 public void startSynch(TmfStartSynchSignal signal
) {
292 public void endSynch(TmfEndSynchSignal signal
) {
295 if (fCoalescingLevel
== 0) {