Fix some null warnings
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / component / TmfEventProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
15
16 package org.eclipse.tracecompass.tmf.core.component;
17
18 import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
19
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.Timer;
26 import java.util.TimerTask;
27
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.eclipse.tracecompass.common.core.NonNullUtils;
30 import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
31 import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
32 import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
33 import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
34 import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
35 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
36 import org.eclipse.tracecompass.tmf.core.filter.ITmfFilter;
37 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
38 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
39 import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
40 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
41 import org.eclipse.tracecompass.tmf.core.signal.TmfStartSynchSignal;
42 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
43 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
44
45 /**
46 * An abstract base class that implements ITmfEventProvider.
47 * <p>
48 * This abstract class implements the housekeeping methods to register/
49 * de-register the event provider and to handle generically the event requests.
50 * </p>
51 *
52 * @author Francois Chouinard
53 */
54 public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider, ITmfFilter {
55
56 // ------------------------------------------------------------------------
57 // Constants
58 // ------------------------------------------------------------------------
59
60 /** Default amount of events per request "chunk" */
61 public static final int DEFAULT_BLOCK_SIZE = 50000;
62
63 /** Delay for coalescing background requests (in milli-seconds) */
64 private static final long DELAY = 1000;
65
66 // ------------------------------------------------------------------------
67 // Attributes
68 // ------------------------------------------------------------------------
69
70 /** List of coalesced requests */
71 private final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new LinkedList<>();
72
73 /** The type of event handled by this provider */
74 private Class<? extends ITmfEvent> fType;
75
76 private final TmfRequestExecutor fExecutor;
77
78 private final Object fLock = new Object();
79
80 private int fSignalDepth = 0;
81
82 private int fRequestPendingCounter = 0;
83
84 private Timer fTimer;
85
86 /** Current timer task */
87 @NonNull private TimerTask fCurrentTask = new TimerTask() { @Override public void run() {} };
88
89 private boolean fIsTimerEnabled;
90
91 /**
92 * The parent event provider.
93 */
94 private TmfEventProvider fParent = null;
95 /**
96 * The list if children event provider.
97 */
98 private final List<TmfEventProvider> fChildren = Collections.synchronizedList(new ArrayList<TmfEventProvider>());
99
100 // ------------------------------------------------------------------------
101 // Constructors
102 // ------------------------------------------------------------------------
103
104 /**
105 * Default constructor
106 */
107 public TmfEventProvider() {
108 super();
109 setTimerEnabled(true);
110 fExecutor = new TmfRequestExecutor();
111 }
112
113 /**
114 * Standard constructor. Instantiate and initialize at the same time.
115 *
116 * @param name
117 * Name of the provider
118 * @param type
119 * The type of events that will be handled
120 */
121 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
122 this();
123 init(name, type);
124 }
125
126 /**
127 * Initialize this data provider
128 *
129 * @param name
130 * Name of the provider
131 * @param type
132 * The type of events that will be handled
133 */
134 public void init(String name, Class<? extends ITmfEvent> type) {
135 super.init(name);
136 fType = type;
137 fExecutor.init();
138
139 fSignalDepth = 0;
140
141 synchronized (fLock) {
142 fTimer = new Timer();
143 }
144
145 TmfProviderManager.register(fType, this);
146 }
147
148 @Override
149 public void dispose() {
150 TmfProviderManager.deregister(fType, this);
151 fExecutor.stop();
152 synchronized (fLock) {
153 if (fTimer != null) {
154 fTimer.cancel();
155 }
156 fTimer = null;
157 }
158
159 synchronized (fChildren) {
160 for (TmfEventProvider child : fChildren) {
161 child.dispose();
162 }
163 fChildren.clear();
164 }
165 clearPendingRequests();
166 super.dispose();
167 }
168
169 // ------------------------------------------------------------------------
170 // Accessors
171 // ------------------------------------------------------------------------
172
173 /**
174 * @since 2.0
175 */
176 @Override
177 public Class<? extends ITmfEvent> getEventType() {
178 return fType;
179 }
180
181 // ------------------------------------------------------------------------
182 // ITmfRequestHandler
183 // ------------------------------------------------------------------------
184
185 @Override
186 public void sendRequest(final ITmfEventRequest request) {
187 synchronized (fLock) {
188
189 if (TmfCoreTracer.isRequestTraced()) {
190 TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
191 }
192
193 if (request.getProviderFilter() == null) {
194 request.setProviderFilter(this);
195 }
196
197 if (sendWithParent(request)) {
198 return;
199 }
200
201 if (request.getExecType() == ExecutionType.FOREGROUND) {
202 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
203 coalesceEventRequest(request);
204 } else {
205 queueRequest(request);
206 }
207 return;
208 }
209
210 /*
211 * Dispatch request in case timer is not running.
212 */
213 if (fTimer == null) {
214 queueRequest(request);
215 return;
216 }
217
218 coalesceEventRequest(request);
219
220 if (fIsTimerEnabled) {
221 fCurrentTask.cancel();
222 fCurrentTask = new TimerTask() {
223 @Override
224 public void run() {
225 synchronized (fLock) {
226 fireRequest(true);
227 }
228 }
229 };
230 fTimer.schedule(fCurrentTask, DELAY);
231 }
232 }
233 }
234
235 private void fireRequest(boolean isTimeout) {
236 synchronized (fLock) {
237 if (fRequestPendingCounter > 0) {
238 return;
239 }
240
241 if (fPendingCoalescedRequests.size() > 0) {
242 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
243 while (iter.hasNext()) {
244 ExecutionType type = (isTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
245 ITmfEventRequest request = iter.next();
246 if (type == request.getExecType()) {
247 queueRequest(request);
248 iter.remove();
249 }
250 }
251 }
252 }
253 }
254
255 /**
256 * Increments/decrements the pending requests counters and fires the request
257 * if necessary (counter == 0). Used for coalescing requests across multiple
258 * TmfDataProvider's.
259 *
260 * @param isIncrement
261 * Should we increment (true) or decrement (false) the pending
262 * counter
263 */
264 @Override
265 public void notifyPendingRequest(boolean isIncrement) {
266 synchronized (fLock) {
267 if (isIncrement) {
268 fRequestPendingCounter++;
269 } else {
270 if (fRequestPendingCounter > 0) {
271 fRequestPendingCounter--;
272 }
273
274 // fire request if all pending requests are received
275 if (fRequestPendingCounter == 0) {
276 fireRequest(false);
277 fireRequest(true);
278 }
279 }
280 }
281 }
282
283 // ------------------------------------------------------------------------
284 // Coalescing
285 // ------------------------------------------------------------------------
286
287 /**
288 * Create a new request from an existing one, and add it to the coalesced
289 * requests
290 *
291 * @param request
292 * The request to copy
293 */
294 protected void newCoalescedEventRequest(ITmfEventRequest request) {
295 synchronized (fLock) {
296 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
297 request.getDataType(),
298 request.getRange(),
299 request.getIndex(),
300 request.getNbRequested(),
301 request.getExecType());
302 coalescedRequest.addRequest(request);
303 coalescedRequest.setProviderFilter(this);
304 if (TmfCoreTracer.isRequestTraced()) {
305 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
306 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
307 }
308 coalesceChildrenRequests(coalescedRequest);
309 fPendingCoalescedRequests.add(coalescedRequest);
310 }
311 }
312
313 /**
314 * Add an existing requests to the list of coalesced ones
315 *
316 * @param request
317 * The request to add to the list
318 */
319 protected void coalesceEventRequest(ITmfEventRequest request) {
320 synchronized (fLock) {
321 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
322 if (coalescedRequest.isCompatible(request)) {
323 coalescedRequest.addRequest(request);
324 if (TmfCoreTracer.isRequestTraced()) {
325 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
326 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
327 }
328 coalesceChildrenRequests(coalescedRequest);
329 return;
330 }
331 }
332 newCoalescedEventRequest(request);
333 }
334 }
335
336 /*
337 * Sends a request with the parent if compatible.
338 */
339 private boolean sendWithParent(final ITmfEventRequest request) {
340 ITmfEventProvider parent = getParent();
341 if (parent instanceof TmfEventProvider) {
342 return ((TmfEventProvider) parent).sendIfCompatible(request);
343 }
344 return false;
345 }
346
347 /*
348 * Sends a request if compatible with a pending coalesced request.
349 */
350 private boolean sendIfCompatible(ITmfEventRequest request) {
351 synchronized (fLock) {
352 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
353 if (coalescedRequest.isCompatible(request)) {
354 // Send so it can be coalesced with the parent(s)
355 sendRequest(request);
356 return true;
357 }
358 }
359 }
360 return sendWithParent(request);
361 }
362
363 /*
364 * Coalesces children requests with given request if compatible.
365 */
366 private void coalesceChildrenRequests(final TmfCoalescedEventRequest request) {
367 synchronized (fChildren) {
368 for (TmfEventProvider child : fChildren) {
369 child.coalesceCompatibleRequests(request);
370 }
371 }
372 }
373
374
375 /*
376 * Coalesces all pending requests that are compatible with coalesced request.
377 */
378 private void coalesceCompatibleRequests(TmfCoalescedEventRequest request) {
379 Iterator<TmfCoalescedEventRequest> iter = getPendingRequests().iterator();
380 while (iter.hasNext()) {
381 TmfCoalescedEventRequest pendingRequest = iter.next();
382 if (request.isCompatible(pendingRequest)) {
383 request.addRequest(pendingRequest);
384 if (TmfCoreTracer.isRequestTraced()) {
385 TmfCoreTracer.traceRequest(pendingRequest.getRequestId(), "COALESCED with " + request.getRequestId()); //$NON-NLS-1$
386 TmfCoreTracer.traceRequest(request.getRequestId(), "now contains " + request.getSubRequestIds()); //$NON-NLS-1$
387 }
388 iter.remove();
389 }
390 }
391 }
392
393 // ------------------------------------------------------------------------
394 // Request processing
395 // ------------------------------------------------------------------------
396
397 /**
398 * Queue a request.
399 *
400 * @param request
401 * The data request
402 */
403 protected void queueRequest(final ITmfEventRequest request) {
404
405 if (fExecutor.isShutdown()) {
406 request.cancel();
407 return;
408 }
409
410 TmfEventThread thread = new TmfEventThread(this, request);
411
412 if (TmfCoreTracer.isRequestTraced()) {
413 TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
414 }
415
416 fExecutor.execute(thread);
417 }
418
419 /**
420 * Initialize the provider based on the request. The context is provider
421 * specific and will be updated by getNext().
422 *
423 * @param request
424 * The request
425 * @return An application specific context; null if request can't be
426 * serviced
427 */
428 public abstract ITmfContext armRequest(ITmfEventRequest request);
429
430 /**
431 * Checks if the data meets the request completion criteria.
432 *
433 * @param request
434 * The request
435 * @param event
436 * The data to verify
437 * @param nbRead
438 * The number of events read so far
439 * @return true if completion criteria is met
440 */
441 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
442 boolean requestCompleted = isCompleted2(request, nbRead);
443 if (!requestCompleted) {
444 ITmfTimestamp endTime = request.getRange().getEndTime();
445 return event.getTimestamp().compareTo(endTime) > 0;
446 }
447 return requestCompleted;
448 }
449
450 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
451 return request.isCompleted() || nbRead >= request.getNbRequested();
452 }
453
454 // ------------------------------------------------------------------------
455 // Pass-through's to the request executor
456 // ------------------------------------------------------------------------
457
458 /**
459 * @return the shutdown state (i.e. if it is accepting new requests)
460 */
461 protected boolean executorIsShutdown() {
462 return fExecutor.isShutdown();
463 }
464
465 /**
466 * @return the termination state
467 */
468 protected boolean executorIsTerminated() {
469 return fExecutor.isTerminated();
470 }
471
472 // ------------------------------------------------------------------------
473 // Signal handlers
474 // ------------------------------------------------------------------------
475
476 /**
477 * Handler for the start synch signal
478 *
479 * @param signal
480 * Incoming signal
481 */
482 @TmfSignalHandler
483 public void startSynch(TmfStartSynchSignal signal) {
484 synchronized (fLock) {
485 fSignalDepth++;
486 }
487 }
488
489 /**
490 * Handler for the end synch signal
491 *
492 * @param signal
493 * Incoming signal
494 */
495 @TmfSignalHandler
496 public void endSynch(TmfEndSynchSignal signal) {
497 synchronized (fLock) {
498 fSignalDepth--;
499 if (fSignalDepth == 0) {
500 fireRequest(false);
501 }
502 }
503 }
504
505 @Override
506 public ITmfEventProvider getParent() {
507 synchronized (fLock) {
508 return fParent;
509 }
510 }
511
512 @Override
513 public void setParent(ITmfEventProvider parent) {
514 if (!(parent instanceof TmfEventProvider)) {
515 throw new IllegalArgumentException();
516 }
517
518 synchronized (fLock) {
519 fParent = (TmfEventProvider) parent;
520 }
521 }
522
523 @Override
524 public List<ITmfEventProvider> getChildren() {
525 synchronized (fChildren) {
526 List<ITmfEventProvider> list = new ArrayList<>();
527 list.addAll(fChildren);
528 return list;
529 }
530 }
531
532 @Override
533 public <T extends ITmfEventProvider> List<T> getChildren(Class<T> clazz) {
534 List<@NonNull T> list = new ArrayList<>();
535 synchronized (fChildren) {
536 for (TmfEventProvider child : fChildren) {
537 if (clazz.isAssignableFrom(child.getClass())) {
538 list.add(checkNotNull(clazz.cast(child)));
539 }
540 }
541 }
542 return list;
543 }
544
545 @Override
546 public ITmfEventProvider getChild(String name) {
547 synchronized (fChildren) {
548 for (TmfEventProvider child : fChildren) {
549 if (child.getName().equals(name)) {
550 return child;
551 }
552 }
553 }
554 return null;
555 }
556
557 @Override
558 public ITmfEventProvider getChild(int index) {
559 return NonNullUtils.checkNotNull(fChildren.get(index));
560 }
561
562 @Override
563 public void addChild(ITmfEventProvider child) {
564 if (!(child instanceof TmfEventProvider)) {
565 throw new IllegalArgumentException();
566 }
567 child.setParent(this);
568 fChildren.add((TmfEventProvider)child);
569 }
570
571 @Override
572 public int getNbChildren() {
573 return fChildren.size();
574 }
575
576 /**
577 * Returns true if an event was provided by this event provider or one of
578 * its children event providers else false.
579 *
580 * @param event
581 * the event to check
582 * @return <code>true</code> if event was provided by this provider or one
583 * of its children else <code>false</code>
584 */
585 @Override
586 public boolean matches(ITmfEvent event) {
587 if ((event.getTrace() == this)) {
588 return true;
589 }
590 if (fChildren.size() > 0) {
591 synchronized (fLock) {
592 List <TmfEventProvider> children = getChildren(TmfEventProvider.class);
593 for (TmfEventProvider child : children) {
594 if (child.matches(event)) {
595 return true;
596 }
597 }
598 }
599 }
600 return false;
601 }
602
603 // ------------------------------------------------------------------------
604 // Debug code (will also used in tests using reflection)
605 // ------------------------------------------------------------------------
606
607 /**
608 * Gets a list of all pending requests. Debug code.
609 *
610 * @return a list of all pending requests
611 */
612 private List<TmfCoalescedEventRequest> getPendingRequests() {
613 return fPendingCoalescedRequests;
614 }
615
616 /**
617 * Clears all pending requests. Debug code.
618 */
619 private void clearPendingRequests() {
620 fPendingCoalescedRequests.clear();
621 }
622
623 /**
624 * Enables/disables the timer. Debug code.
625 *
626 * @param enabled
627 * the enable flag to set
628 */
629 private void setTimerEnabled(Boolean enabled) {
630 fIsTimerEnabled = enabled;
631 }
632 }
This page took 0.047614 seconds and 5 git commands to generate.