Commit | Line | Data |
---|---|---|
8c8bf09f | 1 | /******************************************************************************* |
8967c8c0 | 2 | * Copyright (c) 2009, 2014 Ericsson |
0283f7ff | 3 | * |
8c8bf09f ASL |
4 | * All rights reserved. This program and the accompanying materials are |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
0283f7ff | 8 | * |
8c8bf09f | 9 | * Contributors: |
fd3f1eff AM |
10 | * Francois Chouinard - Initial API and implementation, replace background |
11 | * requests by preemptable requests | |
12 | * Alexandre Montplaisir - Merge with TmfDataProvider | |
8967c8c0 | 13 | * Bernd Hufmann - Add timer based coalescing for background requests |
8c8bf09f ASL |
14 | *******************************************************************************/ |
15 | ||
6c13869b | 16 | package org.eclipse.linuxtools.tmf.core.component; |
8c8bf09f | 17 | |
fd3f1eff | 18 | import java.util.ArrayList; |
8967c8c0 | 19 | import java.util.Iterator; |
fd3f1eff | 20 | import java.util.List; |
8967c8c0 BH |
21 | import java.util.Timer; |
22 | import java.util.TimerTask; | |
fd3f1eff AM |
23 | import java.util.concurrent.BlockingQueue; |
24 | import java.util.concurrent.LinkedBlockingQueue; | |
25 | import java.util.concurrent.SynchronousQueue; | |
26 | ||
5419a136 | 27 | import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; |
fd3f1eff AM |
28 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; |
29 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager; | |
5419a136 | 30 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest; |
fd3f1eff | 31 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; |
72f1e62a | 32 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; |
5419a136 | 33 | import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest; |
fd3f1eff AM |
34 | import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest.ExecutionType; |
35 | import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal; | |
36 | import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler; | |
37 | import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal; | |
3bd46eef | 38 | import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp; |
fd3f1eff | 39 | import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; |
8c8bf09f ASL |
40 | |
41 | /** | |
fd3f1eff AM |
42 | * An abstract base class that implements ITmfEventProvider. |
43 | * <p> | |
44 | * This abstract class implements the housekeeping methods to register/ | |
45 | * de-register the event provider and to handle generically the event requests. | |
46 | * </p> | |
0283f7ff | 47 | * |
8fd82db5 | 48 | * @author Francois Chouinard |
c4767854 | 49 | * @since 3.0 |
8c8bf09f | 50 | */ |
fd3f1eff AM |
51 | public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider { |
52 | ||
53 | // ------------------------------------------------------------------------ | |
54 | // Constants | |
55 | // ------------------------------------------------------------------------ | |
56 | ||
c4767854 AM |
57 | /** Default amount of events per request "chunk" |
58 | * @since 3.0 */ | |
fd3f1eff AM |
59 | public static final int DEFAULT_BLOCK_SIZE = 50000; |
60 | ||
c4767854 AM |
61 | /** Default size of the queue |
62 | * @since 3.0 */ | |
fd3f1eff AM |
63 | public static final int DEFAULT_QUEUE_SIZE = 1000; |
64 | ||
8967c8c0 BH |
65 | /** Delay for coalescing background requests (in milli-seconds) */ |
66 | private static final long DELAY = 1000; | |
67 | ||
fd3f1eff AM |
68 | // ------------------------------------------------------------------------ |
69 | // Attributes | |
70 | // ------------------------------------------------------------------------ | |
71 | ||
c4767854 AM |
72 | /** List of coalesced requests |
73 | * @since 3.0*/ | |
a4524c1b | 74 | protected final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new ArrayList<>(); |
fd3f1eff | 75 | |
c4767854 AM |
76 | /** The type of event handled by this provider |
77 | * @since 3.0*/ | |
fd3f1eff AM |
78 | protected Class<? extends ITmfEvent> fType; |
79 | ||
c4767854 AM |
80 | /** Queue of events |
81 | * @since 3.0*/ | |
fd3f1eff AM |
82 | protected BlockingQueue<ITmfEvent> fDataQueue; |
83 | ||
c4767854 AM |
84 | /** Size of the fDataQueue |
85 | * @since 3.0*/ | |
fd3f1eff AM |
86 | protected int fQueueSize = DEFAULT_QUEUE_SIZE; |
87 | ||
88 | private final TmfRequestExecutor fExecutor; | |
89 | ||
90 | private final Object fLock = new Object(); | |
91 | ||
92 | private int fSignalDepth = 0; | |
93 | ||
94 | private int fRequestPendingCounter = 0; | |
8c8bf09f | 95 | |
8967c8c0 BH |
96 | private final Timer fTimer; |
97 | ||
98 | private boolean fIsTimeout = false; | |
99 | ||
00641a97 FC |
100 | // ------------------------------------------------------------------------ |
101 | // Constructors | |
102 | // ------------------------------------------------------------------------ | |
103 | ||
063f0d27 AM |
104 | /** |
105 | * Default constructor | |
106 | */ | |
12c155f5 | 107 | public TmfEventProvider() { |
00641a97 | 108 | super(); |
fd3f1eff | 109 | fQueueSize = DEFAULT_QUEUE_SIZE; |
a4524c1b | 110 | fDataQueue = new LinkedBlockingQueue<>(fQueueSize); |
fd3f1eff | 111 | fExecutor = new TmfRequestExecutor(); |
8967c8c0 | 112 | fTimer = new Timer(); |
12c155f5 | 113 | } |
8c8bf09f | 114 | |
063f0d27 | 115 | /** |
fd3f1eff | 116 | * Initialize this data provider |
063f0d27 AM |
117 | * |
118 | * @param name | |
fd3f1eff | 119 | * Name of the provider |
063f0d27 | 120 | * @param type |
fd3f1eff | 121 | * The type of events that will be handled |
063f0d27 | 122 | */ |
fd3f1eff AM |
123 | public void init(String name, Class<? extends ITmfEvent> type) { |
124 | super.init(name); | |
125 | fType = type; | |
126 | fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>(); | |
127 | ||
128 | fExecutor.init(); | |
8967c8c0 | 129 | |
fd3f1eff AM |
130 | fSignalDepth = 0; |
131 | ||
132 | TmfProviderManager.register(fType, this); | |
12c155f5 FC |
133 | } |
134 | ||
063f0d27 | 135 | /** |
fd3f1eff | 136 | * Constructor specifying the event type and the queue size. |
063f0d27 AM |
137 | * |
138 | * @param name | |
fd3f1eff | 139 | * Name of the provider |
063f0d27 | 140 | * @param type |
fd3f1eff | 141 | * Type of event that will be handled |
063f0d27 | 142 | * @param queueSize |
fd3f1eff | 143 | * Size of the event queue |
063f0d27 | 144 | */ |
fd3f1eff AM |
145 | protected TmfEventProvider(String name, Class<? extends ITmfEvent> type, int queueSize) { |
146 | this(); | |
147 | fQueueSize = queueSize; | |
148 | init(name, type); | |
12c155f5 FC |
149 | } |
150 | ||
063f0d27 AM |
151 | /** |
152 | * Copy constructor | |
153 | * | |
154 | * @param other | |
fd3f1eff | 155 | * The other object to copy |
063f0d27 | 156 | */ |
6256d8ad | 157 | public TmfEventProvider(TmfEventProvider other) { |
fd3f1eff AM |
158 | this(); |
159 | init(other.getName(), other.fType); | |
160 | } | |
161 | ||
162 | /** | |
163 | * Standard constructor. Instantiate and initialize at the same time. | |
164 | * | |
165 | * @param name | |
166 | * Name of the provider | |
167 | * @param type | |
168 | * The type of events that will be handled | |
169 | */ | |
170 | public TmfEventProvider(String name, Class<? extends ITmfEvent> type) { | |
171 | this(name, type, DEFAULT_QUEUE_SIZE); | |
172 | } | |
173 | ||
174 | @Override | |
175 | public void dispose() { | |
176 | TmfProviderManager.deregister(fType, this); | |
177 | fExecutor.stop(); | |
178 | super.dispose(); | |
12c155f5 FC |
179 | } |
180 | ||
00641a97 | 181 | // ------------------------------------------------------------------------ |
fd3f1eff AM |
182 | // Accessors |
183 | // ------------------------------------------------------------------------ | |
184 | ||
185 | /** | |
186 | * Get the queue size of this provider | |
187 | * | |
188 | * @return The size of the queue | |
189 | */ | |
190 | public int getQueueSize() { | |
191 | return fQueueSize; | |
192 | } | |
193 | ||
194 | /** | |
195 | * Get the event type this provider handles | |
196 | * | |
197 | * @return The type of ITmfEvent | |
198 | */ | |
0f89d4ba | 199 | public Class<? extends ITmfEvent> getType() { |
fd3f1eff AM |
200 | return fType; |
201 | } | |
202 | ||
203 | // ------------------------------------------------------------------------ | |
204 | // ITmfRequestHandler | |
00641a97 FC |
205 | // ------------------------------------------------------------------------ |
206 | ||
c4767854 AM |
207 | /** |
208 | * @since 3.0 | |
209 | */ | |
5419a136 | 210 | @Override |
fd3f1eff AM |
211 | public void sendRequest(final ITmfEventRequest request) { |
212 | synchronized (fLock) { | |
8967c8c0 BH |
213 | if (request.getExecType() == ExecutionType.FOREGROUND) { |
214 | if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) { | |
215 | coalesceEventRequest(request); | |
216 | } else { | |
217 | dispatchRequest(request); | |
218 | } | |
219 | return; | |
220 | } | |
221 | ||
222 | /* | |
223 | * For the first background request in the request pending queue | |
224 | * a timer will be started to allow other background requests to | |
225 | * coalesce. | |
226 | */ | |
227 | boolean startTimer = (getNbPendingBackgroundRequests() == 0); | |
228 | coalesceEventRequest(request); | |
229 | if (startTimer) { | |
230 | TimerTask task = new TimerTask() { | |
231 | @Override | |
232 | public void run() { | |
233 | synchronized (fLock) { | |
234 | fIsTimeout = true; | |
235 | fireRequest(); | |
236 | } | |
237 | } | |
238 | }; | |
239 | fTimer.schedule(task, DELAY); | |
fd3f1eff AM |
240 | } |
241 | } | |
242 | } | |
243 | ||
244 | @Override | |
245 | public void fireRequest() { | |
246 | synchronized (fLock) { | |
247 | if (fRequestPendingCounter > 0) { | |
248 | return; | |
249 | } | |
8967c8c0 | 250 | |
fd3f1eff | 251 | if (fPendingCoalescedRequests.size() > 0) { |
8967c8c0 BH |
252 | Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator(); |
253 | while (iter.hasNext()) { | |
254 | ExecutionType type = (fIsTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND); | |
255 | ITmfEventRequest request = iter.next(); | |
256 | if (type == request.getExecType()) { | |
257 | dispatchRequest(request); | |
258 | iter.remove(); | |
259 | } | |
fd3f1eff | 260 | } |
fd3f1eff | 261 | } |
5419a136 | 262 | } |
5419a136 AM |
263 | } |
264 | ||
fd3f1eff AM |
265 | /** |
266 | * Increments/decrements the pending requests counters and fires the request | |
267 | * if necessary (counter == 0). Used for coalescing requests across multiple | |
268 | * TmfDataProvider's. | |
269 | * | |
270 | * @param isIncrement | |
271 | * Should we increment (true) or decrement (false) the pending | |
272 | * counter | |
273 | */ | |
5419a136 | 274 | @Override |
fd3f1eff AM |
275 | public void notifyPendingRequest(boolean isIncrement) { |
276 | synchronized (fLock) { | |
277 | if (isIncrement) { | |
8967c8c0 | 278 | fRequestPendingCounter++; |
fd3f1eff AM |
279 | } else { |
280 | if (fRequestPendingCounter > 0) { | |
281 | fRequestPendingCounter--; | |
282 | } | |
283 | ||
284 | // fire request if all pending requests are received | |
285 | if (fRequestPendingCounter == 0) { | |
286 | fireRequest(); | |
287 | } | |
288 | } | |
289 | } | |
290 | } | |
291 | ||
292 | // ------------------------------------------------------------------------ | |
293 | // Coalescing | |
294 | // ------------------------------------------------------------------------ | |
295 | ||
296 | /** | |
297 | * Create a new request from an existing one, and add it to the coalesced | |
298 | * requests | |
299 | * | |
300 | * @param request | |
301 | * The request to copy | |
c4767854 | 302 | * @since 3.0 |
fd3f1eff | 303 | */ |
8967c8c0 BH |
304 | protected void newCoalescedEventRequest(ITmfEventRequest request) { |
305 | synchronized (fLock) { | |
672a642a | 306 | TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest( |
fd3f1eff AM |
307 | request.getDataType(), |
308 | request.getRange(), | |
309 | request.getIndex(), | |
310 | request.getNbRequested(), | |
311 | request.getExecType()); | |
312 | coalescedRequest.addRequest(request); | |
5419a136 AM |
313 | if (TmfCoreTracer.isRequestTraced()) { |
314 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
315 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
316 | } | |
317 | fPendingCoalescedRequests.add(coalescedRequest); | |
8967c8c0 | 318 | } |
fd3f1eff AM |
319 | } |
320 | ||
321 | /** | |
322 | * Add an existing requests to the list of coalesced ones | |
323 | * | |
324 | * @param request | |
325 | * The request to add to the list | |
c4767854 | 326 | * @since 3.0 |
fd3f1eff AM |
327 | */ |
328 | protected void coalesceEventRequest(ITmfEventRequest request) { | |
329 | synchronized (fLock) { | |
330 | for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) { | |
331 | if (coalescedRequest.isCompatible(request)) { | |
332 | coalescedRequest.addRequest(request); | |
333 | if (TmfCoreTracer.isRequestTraced()) { | |
334 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
335 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
336 | } | |
337 | return; | |
338 | } | |
339 | } | |
340 | newCoalescedEventRequest(request); | |
341 | } | |
342 | } | |
343 | ||
8967c8c0 BH |
344 | /** |
345 | * Gets the number of background requests in pending queue. | |
346 | * | |
347 | * @return the number of background requests in pending queue | |
348 | */ | |
349 | private int getNbPendingBackgroundRequests() { | |
350 | int nbBackgroundRequests = 0; | |
351 | synchronized (fLock) { | |
352 | for (ITmfEventRequest request : fPendingCoalescedRequests) { | |
353 | if (request.getExecType() == ExecutionType.BACKGROUND) { | |
354 | nbBackgroundRequests++; | |
355 | } | |
356 | } | |
357 | } | |
358 | return nbBackgroundRequests; | |
359 | } | |
360 | ||
fd3f1eff AM |
361 | // ------------------------------------------------------------------------ |
362 | // Request processing | |
363 | // ------------------------------------------------------------------------ | |
364 | ||
365 | private void dispatchRequest(final ITmfEventRequest request) { | |
366 | if (request.getExecType() == ExecutionType.FOREGROUND) { | |
367 | queueRequest(request); | |
5419a136 | 368 | } else { |
fd3f1eff AM |
369 | queueBackgroundRequest(request, true); |
370 | } | |
371 | } | |
372 | ||
373 | /** | |
374 | * Queue a request. | |
375 | * | |
376 | * @param request | |
377 | * The data request | |
c4767854 | 378 | * @since 3.0 |
fd3f1eff AM |
379 | */ |
380 | protected void queueRequest(final ITmfEventRequest request) { | |
381 | ||
382 | if (fExecutor.isShutdown()) { | |
383 | request.cancel(); | |
384 | return; | |
385 | } | |
386 | ||
387 | TmfEventThread thread = new TmfEventThread(this, request); | |
388 | ||
389 | if (TmfCoreTracer.isRequestTraced()) { | |
390 | TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$ | |
391 | } | |
392 | ||
393 | fExecutor.execute(thread); | |
394 | } | |
395 | ||
396 | /** | |
397 | * Queue a background request | |
398 | * | |
399 | * @param request | |
400 | * The request | |
401 | * @param indexing | |
402 | * Should we index the chunks | |
403 | * @since 3.0 | |
404 | */ | |
405 | protected void queueBackgroundRequest(final ITmfEventRequest request, final boolean indexing) { | |
406 | queueRequest(request); | |
407 | } | |
408 | ||
409 | /** | |
410 | * Initialize the provider based on the request. The context is provider | |
411 | * specific and will be updated by getNext(). | |
412 | * | |
413 | * @param request | |
414 | * The request | |
415 | * @return An application specific context; null if request can't be | |
416 | * serviced | |
c4767854 | 417 | * @since 3.0 |
fd3f1eff AM |
418 | */ |
419 | public abstract ITmfContext armRequest(ITmfEventRequest request); | |
420 | ||
421 | /** | |
422 | * Checks if the data meets the request completion criteria. | |
423 | * | |
424 | * @param request | |
425 | * The request | |
426 | * @param event | |
427 | * The data to verify | |
428 | * @param nbRead | |
429 | * The number of events read so far | |
430 | * @return true if completion criteria is met | |
c4767854 | 431 | * @since 3.0 |
fd3f1eff AM |
432 | */ |
433 | public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) { | |
434 | boolean requestCompleted = isCompleted2(request, nbRead); | |
435 | if (!requestCompleted) { | |
436 | ITmfTimestamp endTime = request.getRange().getEndTime(); | |
437 | return event.getTimestamp().compareTo(endTime, false) > 0; | |
438 | } | |
439 | return requestCompleted; | |
440 | } | |
441 | ||
442 | private static boolean isCompleted2(ITmfEventRequest request,int nbRead) { | |
443 | return request.isCompleted() || nbRead >= request.getNbRequested(); | |
444 | } | |
445 | ||
446 | // ------------------------------------------------------------------------ | |
447 | // Pass-through's to the request executor | |
448 | // ------------------------------------------------------------------------ | |
449 | ||
450 | /** | |
451 | * @return the shutdown state (i.e. if it is accepting new requests) | |
452 | * @since 2.0 | |
453 | */ | |
454 | protected boolean executorIsShutdown() { | |
455 | return fExecutor.isShutdown(); | |
456 | } | |
457 | ||
458 | /** | |
459 | * @return the termination state | |
460 | * @since 2.0 | |
461 | */ | |
462 | protected boolean executorIsTerminated() { | |
463 | return fExecutor.isTerminated(); | |
464 | } | |
465 | ||
466 | // ------------------------------------------------------------------------ | |
467 | // Signal handlers | |
468 | // ------------------------------------------------------------------------ | |
469 | ||
470 | /** | |
471 | * Handler for the start synch signal | |
472 | * | |
473 | * @param signal | |
474 | * Incoming signal | |
475 | */ | |
476 | @TmfSignalHandler | |
477 | public void startSynch(TmfStartSynchSignal signal) { | |
478 | synchronized (fLock) { | |
479 | fSignalDepth++; | |
480 | } | |
481 | } | |
482 | ||
483 | /** | |
484 | * Handler for the end synch signal | |
485 | * | |
486 | * @param signal | |
487 | * Incoming signal | |
488 | */ | |
489 | @TmfSignalHandler | |
490 | public void endSynch(TmfEndSynchSignal signal) { | |
491 | synchronized (fLock) { | |
492 | fSignalDepth--; | |
493 | if (fSignalDepth == 0) { | |
8967c8c0 | 494 | fIsTimeout = false; |
fd3f1eff AM |
495 | fireRequest(); |
496 | } | |
5419a136 AM |
497 | } |
498 | } | |
951d134a | 499 | |
8c8bf09f | 500 | } |