1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.component
;
15 import java
.lang
.reflect
.Array
;
16 import java
.util
.Vector
;
17 import java
.util
.concurrent
.BlockingQueue
;
18 import java
.util
.concurrent
.LinkedBlockingQueue
;
19 import java
.util
.concurrent
.SynchronousQueue
;
21 import org
.eclipse
.linuxtools
.tmf
.event
.TmfData
;
22 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.TmfCoalescedDataRequest
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfRequestExecutor
;
26 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
29 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
32 * <b><u>TmfProvider</u></b>
34 * The TmfProvider<T> is a provider for a data of type <T>.
36 * This abstract class implements the housekeeking methods to register/
37 * deregister the event provider and to handle generically the event requests.
39 * The concrete class can either re-implement processRequest() entirely or
40 * just implement the hooks (initializeContext() and getNext()).
42 * TODO: Add support for providing multiple data types.
44 public abstract class TmfDataProvider
<T
extends TmfData
> extends TmfComponent
implements ITmfDataProvider
<T
> {
46 final protected Class
<T
> fType
;
48 public static final int DEFAULT_QUEUE_SIZE
= 1000;
49 protected final int fQueueSize
;
50 protected final BlockingQueue
<T
> fDataQueue
;
51 protected final TmfRequestExecutor fExecutor
;
53 private Integer fSynchDepth
;
55 // ------------------------------------------------------------------------
57 // ------------------------------------------------------------------------
59 public TmfDataProvider(String name
, Class
<T
> type
) {
60 this(name
, type
, DEFAULT_QUEUE_SIZE
);
63 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
65 fQueueSize
= queueSize
;
67 fDataQueue
= (queueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
69 fExecutor
= new TmfRequestExecutor();
75 public void register() {
77 TmfProviderManager
.register(fType
, this);
81 public void deregister() {
82 TmfProviderManager
.deregister(fType
, this);
86 public int getQueueSize() {
90 // ------------------------------------------------------------------------
92 // ------------------------------------------------------------------------
94 public void sendRequest(final TmfDataRequest
<T
> request
) {
96 if (fSynchDepth
> 0) {
97 // We are in coalescing mode: client should NEVER wait
98 // (otherwise we will have deadlock...)
99 coalesceDataRequest(request
);
101 // Process the request immediately
102 queueRequest(request
);
107 * This method queues the coalesced requests.
111 private synchronized void fireRequests() {
112 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
113 queueRequest(request
);
115 fPendingCoalescedRequests
.clear();
118 // ------------------------------------------------------------------------
119 // Coalescing (primitive test...)
120 // ------------------------------------------------------------------------
122 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
124 protected synchronized void newCoalescedDataRequest(TmfDataRequest
<T
> request
) {
125 TmfCoalescedDataRequest
<T
> coalescedRequest
=
126 new TmfCoalescedDataRequest
<T
>(fType
, request
.getIndex(), request
.getNbRequested(), request
.getBlockize());
127 coalescedRequest
.addRequest(request
);
128 fPendingCoalescedRequests
.add(coalescedRequest
);
131 protected synchronized void coalesceDataRequest(TmfDataRequest
<T
> request
) {
132 for (TmfCoalescedDataRequest
<T
> req
: fPendingCoalescedRequests
) {
133 if (req
.isCompatible(request
)) {
134 req
.addRequest(request
);
138 newCoalescedDataRequest(request
);
141 // ------------------------------------------------------------------------
142 // Request processing
143 // ------------------------------------------------------------------------
145 protected void queueRequest(final TmfDataRequest
<T
> request
) {
147 // Process the request
148 Thread thread
= new Thread() {
153 // Extract the generic information
154 int blockSize
= request
.getBlockize();
155 int nbRequested
= request
.getNbRequested();
157 // Create the result buffer
158 Vector
<T
> result
= new Vector
<T
>();
161 // Initialize the execution
162 ITmfContext context
= armRequest(request
);
163 if (context
== null) {
168 // Get the ordered events
169 T data
= getNext(context
);
170 while (data
!= null && !isCompleted(request
, data
, nbRead
))
173 if (++nbRead
% blockSize
== 0) {
174 pushData(request
, result
);
178 // To avoid an unnecessary read passed the last data requested
179 if (nbRead
< nbRequested
) {
180 data
= getNext(context
);
181 // while (data != null && data.isNull())
182 // data = getNext(context);
185 pushData(request
, result
);
189 fExecutor
.queueRequest(thread
);
193 * Format the result data and forwards it to the requester.
194 * Note: after handling, the data is *removed*.
199 @SuppressWarnings("unchecked")
200 protected void pushData(ITmfDataRequest
<T
> request
, Vector
<T
> data
) {
201 synchronized(request
) {
202 if (!request
.isCompleted()) {
203 T
[] result
= (T
[]) Array
.newInstance(fType
, data
.size());
204 data
.toArray(result
);
205 request
.setData(result
);
206 request
.handleData();
207 data
.removeAllElements();
213 * Initialize the provider based on the request. The context is
214 * provider specific and will be updated by getNext().
217 * @return an application specific context; null if request can't be serviced
219 public abstract ITmfContext
armRequest(TmfDataRequest
<T
> request
);
222 * Return the next piece of data based on the context supplied. The context
223 * would typically be updated for the subsequent read.
228 public T
getNext(ITmfContext context
) {
230 T event
= fDataQueue
.take();
232 } catch (InterruptedException e
) {
239 * Makes the generated result data available for getNext()
243 public void queueResult(T data
) {
245 fDataQueue
.put(data
);
246 } catch (InterruptedException e1
) {
247 e1
.printStackTrace();
252 * Checks if the data meets the request completion criteria.
258 public boolean isCompleted(TmfDataRequest
<T
> request
, T data
, int nbRead
) {
259 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
262 // ------------------------------------------------------------------------
264 // ------------------------------------------------------------------------
267 public void startSynch(TmfStartSynchSignal signal
) {
268 synchronized(fSynchDepth
) {
274 public void endSynch(TmfEndSynchSignal signal
) {
275 synchronized(fSynchDepth
) {
277 if (fSynchDepth
== 0) {