[Bug309042] Some improvements on TmfExperiment and its context. Also fixed a number...
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.lang.reflect.Array;
16 import java.util.Vector;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.SynchronousQueue;
20
21 import org.eclipse.linuxtools.tmf.event.TmfData;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
24 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
26 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
27 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
28 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
29 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
30
31 /**
32 * <b><u>TmfProvider</u></b>
33 * <p>
34 * The TmfProvider<T> is a provider for a data of type <T>.
35 * <p>
36 * This abstract class implements the housekeeking methods to register/
37 * deregister the event provider and to handle generically the event requests.
38 * <p>
39 * The concrete class can either re-implement processRequest() entirely or
40 * just implement the hooks (initializeContext() and getNext()).
41 * <p>
42 * TODO: Add support for providing multiple data types.
43 */
44 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
45
46 final protected Class<T> fType;
47
48 public static final int DEFAULT_QUEUE_SIZE = 1000;
49 protected final int fQueueSize;
50 protected final BlockingQueue<T> fDataQueue;
51 protected final TmfRequestExecutor fExecutor;
52
53 private Integer fSynchDepth;
54
55 // ------------------------------------------------------------------------
56 // Constructors
57 // ------------------------------------------------------------------------
58
59 public TmfDataProvider(String name, Class<T> type) {
60 this(name, type, DEFAULT_QUEUE_SIZE);
61 }
62
63 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
64 super(name);
65 fQueueSize = queueSize;
66 fType = type;
67 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
68
69 fExecutor = new TmfRequestExecutor();
70 fSynchDepth = 0;
71
72 TmfProviderManager.register(fType, this);
73 }
74
75 @Override
76 public void register() {
77 super.register();
78 TmfProviderManager.register(fType, this);
79 }
80
81 @Override
82 public void deregister() {
83 TmfProviderManager.deregister(fType, this);
84 fExecutor.stop();
85 super.deregister();
86 }
87
88 public int getQueueSize() {
89 return fQueueSize;
90 }
91
92 // ------------------------------------------------------------------------
93 // ITmfRequestHandler
94 // ------------------------------------------------------------------------
95
96 public void sendRequest(final TmfDataRequest<T> request) {
97
98 if (fSynchDepth > 0) {
99 // We are in coalescing mode: client should NEVER wait
100 // (otherwise we will have deadlock...)
101 coalesceDataRequest(request);
102 } else {
103 // Process the request immediately
104 queueRequest(request);
105 }
106 }
107
108 /**
109 * This method queues the coalesced requests.
110 *
111 * @param thread
112 */
113 private synchronized void fireRequests() {
114 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
115 queueRequest(request);
116 }
117 fPendingCoalescedRequests.clear();
118 }
119
120 // ------------------------------------------------------------------------
121 // Coalescing (primitive test...)
122 // ------------------------------------------------------------------------
123
124 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
125
126 protected synchronized void newCoalescedDataRequest(TmfDataRequest<T> request) {
127 TmfCoalescedDataRequest<T> coalescedRequest =
128 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
129 coalescedRequest.addRequest(request);
130 fPendingCoalescedRequests.add(coalescedRequest);
131 }
132
133 protected synchronized void coalesceDataRequest(TmfDataRequest<T> request) {
134 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
135 if (req.isCompatible(request)) {
136 req.addRequest(request);
137 return;
138 }
139 }
140 newCoalescedDataRequest(request);
141 }
142
143 // ------------------------------------------------------------------------
144 // Request processing
145 // ------------------------------------------------------------------------
146
147 protected void queueRequest(final TmfDataRequest<T> request) {
148
149 // Process the request
150 Thread thread = new Thread() {
151
152 @Override
153 public void run() {
154
155 // Extract the generic information
156 int blockSize = request.getBlockize();
157 int nbRequested = request.getNbRequested();
158
159 // Create the result buffer
160 Vector<T> result = new Vector<T>();
161 int nbRead = 0;
162
163 // Initialize the execution
164 ITmfContext context = armRequest(request);
165 if (context == null) {
166 request.fail();
167 return;
168 }
169
170 // Get the ordered events
171 T data = getNext(context);
172 while (data != null && !isCompleted(request, data, nbRead))
173 {
174 result.add(data);
175 if (++nbRead % blockSize == 0) {
176 pushData(request, result);
177 }
178 // To avoid an unnecessary read passed the last data requested
179 if (nbRead < nbRequested) {
180 data = getNext(context);
181 }
182 }
183 pushData(request, result);
184 request.done();
185 }
186 };
187 fExecutor.queueRequest(thread);
188 }
189
190 /**
191 * Format the result data and forwards it to the requester.
192 * Note: after handling, the data is *removed*.
193 *
194 * @param request
195 * @param data
196 */
197 @SuppressWarnings("unchecked")
198 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
199 synchronized(request) {
200 if (!request.isCompleted()) {
201 T[] result = (T[]) Array.newInstance(fType, data.size());
202 data.toArray(result);
203 request.setData(result);
204 request.handleData();
205 data.removeAllElements();
206 }
207 }
208 }
209
210 /**
211 * Initialize the provider based on the request. The context is
212 * provider specific and will be updated by getNext().
213 *
214 * @param request
215 * @return an application specific context; null if request can't be serviced
216 */
217 public abstract ITmfContext armRequest(TmfDataRequest<T> request);
218
219 /**
220 * Return the next piece of data based on the context supplied. The context
221 * would typically be updated for the subsequent read.
222 *
223 * @param context
224 * @return
225 */
226 public T getNext(ITmfContext context) {
227 try {
228 T event = fDataQueue.take();
229 return event;
230 } catch (InterruptedException e) {
231 e.printStackTrace();
232 }
233 return null;
234 }
235
236 /**
237 * Makes the generated result data available for getNext()
238 *
239 * @param data
240 */
241 public void queueResult(T data) {
242 try {
243 fDataQueue.put(data);
244 } catch (InterruptedException e1) {
245 e1.printStackTrace();
246 }
247 }
248
249 /**
250 * Checks if the data meets the request completion criteria.
251 *
252 * @param request
253 * @param data
254 * @return
255 */
256 public boolean isCompleted(TmfDataRequest<T> request, T data, int nbRead) {
257 return request.isCompleted() || nbRead >= request.getNbRequested();
258 }
259
260 // ------------------------------------------------------------------------
261 // Signal handlers
262 // ------------------------------------------------------------------------
263
264 @TmfSignalHandler
265 public void startSynch(TmfStartSynchSignal signal) {
266 synchronized(this) {
267 fSynchDepth++;
268 }
269 }
270
271 @TmfSignalHandler
272 public void endSynch(TmfEndSynchSignal signal) {
273 synchronized(this) {
274 fSynchDepth--;
275 if (fSynchDepth == 0) {
276 fireRequests();
277 }
278 }
279 }
280
281 }
This page took 0.059644 seconds and 5 git commands to generate.