tmf: only dispatch events to relevant event providers
[deliverable/tracecompass.git] / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / component / TmfEventProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
15
16 package org.eclipse.tracecompass.tmf.core.component;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Timer;
24 import java.util.TimerTask;
25
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
28 import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
29 import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
30 import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
31 import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
32 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
33 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
34 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
35 import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
36 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
37 import org.eclipse.tracecompass.tmf.core.signal.TmfStartSynchSignal;
38 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
39 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
40
41 /**
42 * An abstract base class that implements ITmfEventProvider.
43 * <p>
44 * This abstract class implements the housekeeping methods to register/
45 * de-register the event provider and to handle generically the event requests.
46 * </p>
47 *
48 * @author Francois Chouinard
49 * @since 3.0
50 */
51 public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
52
53 // ------------------------------------------------------------------------
54 // Constants
55 // ------------------------------------------------------------------------
56
57 /** Default amount of events per request "chunk"
58 * @since 3.0 */
59 public static final int DEFAULT_BLOCK_SIZE = 50000;
60
61 /** Delay for coalescing background requests (in milli-seconds) */
62 private static final long DELAY = 1000;
63
64 // ------------------------------------------------------------------------
65 // Attributes
66 // ------------------------------------------------------------------------
67
68 /** List of coalesced requests */
69 private final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new LinkedList<>();
70
71 /** The type of event handled by this provider */
72 private Class<? extends ITmfEvent> fType;
73
74 private final TmfRequestExecutor fExecutor;
75
76 private final Object fLock = new Object();
77
78 private int fSignalDepth = 0;
79
80 private int fRequestPendingCounter = 0;
81
82 private Timer fTimer;
83
84 /** Current timer task */
85 @NonNull private TimerTask fCurrentTask = new TimerTask() { @Override public void run() {} };
86
87 private boolean fIsTimerEnabled;
88
89 /**
90 * The parent event provider.
91 */
92 private TmfEventProvider fParent = null;
93 /**
94 * The list if children event provider.
95 */
96 private final List<TmfEventProvider> fChildren = Collections.synchronizedList(new ArrayList<TmfEventProvider>());
97
98 // ------------------------------------------------------------------------
99 // Constructors
100 // ------------------------------------------------------------------------
101
102 /**
103 * Default constructor
104 */
105 public TmfEventProvider() {
106 super();
107 setTimerEnabled(true);
108 fExecutor = new TmfRequestExecutor();
109 }
110
111 /**
112 * Standard constructor. Instantiate and initialize at the same time.
113 *
114 * @param name
115 * Name of the provider
116 * @param type
117 * The type of events that will be handled
118 */
119 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
120 this();
121 init(name, type);
122 }
123
124 /**
125 * Initialize this data provider
126 *
127 * @param name
128 * Name of the provider
129 * @param type
130 * The type of events that will be handled
131 */
132 public void init(String name, Class<? extends ITmfEvent> type) {
133 super.init(name);
134 fType = type;
135 fExecutor.init();
136
137 fSignalDepth = 0;
138
139 synchronized (fLock) {
140 fTimer = new Timer();
141 }
142
143 TmfProviderManager.register(fType, this);
144 }
145
146 @Override
147 public void dispose() {
148 TmfProviderManager.deregister(fType, this);
149 fExecutor.stop();
150 synchronized (fLock) {
151 if (fTimer != null) {
152 fTimer.cancel();
153 }
154 fTimer = null;
155 }
156
157 synchronized (fChildren) {
158 for (TmfEventProvider child : fChildren) {
159 child.dispose();
160 }
161 fChildren.clear();
162 }
163 clearPendingRequests();
164 super.dispose();
165 }
166
167 // ------------------------------------------------------------------------
168 // Accessors
169 // ------------------------------------------------------------------------
170
171 /**
172 * Get the event type this provider handles
173 *
174 * @return The type of ITmfEvent
175 */
176 public Class<? extends ITmfEvent> getType() {
177 return fType;
178 }
179
180 // ------------------------------------------------------------------------
181 // ITmfRequestHandler
182 // ------------------------------------------------------------------------
183
184 /**
185 * @since 3.0
186 */
187 @Override
188 public void sendRequest(final ITmfEventRequest request) {
189 synchronized (fLock) {
190
191 if (TmfCoreTracer.isRequestTraced()) {
192 TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
193 }
194
195 if (request.getEventProvider() == null) {
196 request.setEventProvider(this);
197 }
198
199 if (sendWithParent(request)) {
200 return;
201 }
202
203 if (request.getExecType() == ExecutionType.FOREGROUND) {
204 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
205 coalesceEventRequest(request);
206 } else {
207 queueRequest(request);
208 }
209 return;
210 }
211
212 /*
213 * Dispatch request in case timer is not running.
214 */
215 if (fTimer == null) {
216 queueRequest(request);
217 return;
218 }
219
220 coalesceEventRequest(request);
221
222 if (fIsTimerEnabled) {
223 fCurrentTask.cancel();
224 fCurrentTask = new TimerTask() {
225 @Override
226 public void run() {
227 synchronized (fLock) {
228 fireRequest(true);
229 }
230 }
231 };
232 fTimer.schedule(fCurrentTask, DELAY);
233 }
234 }
235 }
236
237 private void fireRequest(boolean isTimeout) {
238 synchronized (fLock) {
239 if (fRequestPendingCounter > 0) {
240 return;
241 }
242
243 if (fPendingCoalescedRequests.size() > 0) {
244 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
245 while (iter.hasNext()) {
246 ExecutionType type = (isTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
247 ITmfEventRequest request = iter.next();
248 if (type == request.getExecType()) {
249 queueRequest(request);
250 iter.remove();
251 }
252 }
253 }
254 }
255 }
256
257 /**
258 * Increments/decrements the pending requests counters and fires the request
259 * if necessary (counter == 0). Used for coalescing requests across multiple
260 * TmfDataProvider's.
261 *
262 * @param isIncrement
263 * Should we increment (true) or decrement (false) the pending
264 * counter
265 */
266 @Override
267 public void notifyPendingRequest(boolean isIncrement) {
268 synchronized (fLock) {
269 if (isIncrement) {
270 fRequestPendingCounter++;
271 } else {
272 if (fRequestPendingCounter > 0) {
273 fRequestPendingCounter--;
274 }
275
276 // fire request if all pending requests are received
277 if (fRequestPendingCounter == 0) {
278 fireRequest(false);
279 fireRequest(true);
280 }
281 }
282 }
283 }
284
285 // ------------------------------------------------------------------------
286 // Coalescing
287 // ------------------------------------------------------------------------
288
289 /**
290 * Create a new request from an existing one, and add it to the coalesced
291 * requests
292 *
293 * @param request
294 * The request to copy
295 * @since 3.0
296 */
297 protected void newCoalescedEventRequest(ITmfEventRequest request) {
298 synchronized (fLock) {
299 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
300 request.getDataType(),
301 request.getRange(),
302 request.getIndex(),
303 request.getNbRequested(),
304 request.getExecType());
305 coalescedRequest.addRequest(request);
306 coalescedRequest.setEventProvider(this);
307 if (TmfCoreTracer.isRequestTraced()) {
308 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
309 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
310 }
311 coalesceChildrenRequests(coalescedRequest);
312 fPendingCoalescedRequests.add(coalescedRequest);
313 }
314 }
315
316 /**
317 * Add an existing requests to the list of coalesced ones
318 *
319 * @param request
320 * The request to add to the list
321 * @since 3.0
322 */
323 protected void coalesceEventRequest(ITmfEventRequest request) {
324 synchronized (fLock) {
325 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
326 if (coalescedRequest.isCompatible(request)) {
327 coalescedRequest.addRequest(request);
328 if (TmfCoreTracer.isRequestTraced()) {
329 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
330 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
331 }
332 coalesceChildrenRequests(coalescedRequest);
333 return;
334 }
335 }
336 newCoalescedEventRequest(request);
337 }
338 }
339
340 /*
341 * Sends a request with the parent if compatible.
342 */
343 private boolean sendWithParent(final ITmfEventRequest request) {
344 ITmfEventProvider parent = getParent();
345 if (parent instanceof TmfEventProvider) {
346 return ((TmfEventProvider) parent).sendIfCompatible(request);
347 }
348 return false;
349 }
350
351 /*
352 * Sends a request if compatible with a pending coalesced request.
353 */
354 private boolean sendIfCompatible(ITmfEventRequest request) {
355 synchronized (fLock) {
356 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
357 if (coalescedRequest.isCompatible(request)) {
358 // Send so it can be coalesced with the parent(s)
359 sendRequest(request);
360 return true;
361 }
362 }
363 }
364 return sendWithParent(request);
365 }
366
367 /*
368 * Coalesces children requests with given request if compatible.
369 */
370 private void coalesceChildrenRequests(final TmfCoalescedEventRequest request) {
371 synchronized (fChildren) {
372 for (TmfEventProvider child : fChildren) {
373 child.coalesceCompatibleRequests(request);
374 }
375 }
376 }
377
378
379 /*
380 * Coalesces all pending requests that are compatible with coalesced request.
381 */
382 private void coalesceCompatibleRequests(TmfCoalescedEventRequest request) {
383 Iterator<TmfCoalescedEventRequest> iter = getPendingRequests().iterator();
384 while (iter.hasNext()) {
385 TmfCoalescedEventRequest pendingRequest = iter.next();
386 if (request.isCompatible(pendingRequest)) {
387 request.addRequest(pendingRequest);
388 if (TmfCoreTracer.isRequestTraced()) {
389 TmfCoreTracer.traceRequest(pendingRequest.getRequestId(), "COALESCED with " + request.getRequestId()); //$NON-NLS-1$
390 TmfCoreTracer.traceRequest(request.getRequestId(), "now contains " + request.getSubRequestIds()); //$NON-NLS-1$
391 }
392 iter.remove();
393 }
394 }
395 }
396
397 // ------------------------------------------------------------------------
398 // Request processing
399 // ------------------------------------------------------------------------
400
401 /**
402 * Queue a request.
403 *
404 * @param request
405 * The data request
406 * @since 3.0
407 */
408 protected void queueRequest(final ITmfEventRequest request) {
409
410 if (fExecutor.isShutdown()) {
411 request.cancel();
412 return;
413 }
414
415 TmfEventThread thread = new TmfEventThread(this, request);
416
417 if (TmfCoreTracer.isRequestTraced()) {
418 TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
419 }
420
421 fExecutor.execute(thread);
422 }
423
424 /**
425 * Initialize the provider based on the request. The context is provider
426 * specific and will be updated by getNext().
427 *
428 * @param request
429 * The request
430 * @return An application specific context; null if request can't be
431 * serviced
432 * @since 3.0
433 */
434 public abstract ITmfContext armRequest(ITmfEventRequest request);
435
436 /**
437 * Checks if the data meets the request completion criteria.
438 *
439 * @param request
440 * The request
441 * @param event
442 * The data to verify
443 * @param nbRead
444 * The number of events read so far
445 * @return true if completion criteria is met
446 * @since 3.0
447 */
448 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
449 boolean requestCompleted = isCompleted2(request, nbRead);
450 if (!requestCompleted) {
451 ITmfTimestamp endTime = request.getRange().getEndTime();
452 return event.getTimestamp().compareTo(endTime) > 0;
453 }
454 return requestCompleted;
455 }
456
457 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
458 return request.isCompleted() || nbRead >= request.getNbRequested();
459 }
460
461 // ------------------------------------------------------------------------
462 // Pass-through's to the request executor
463 // ------------------------------------------------------------------------
464
465 /**
466 * @return the shutdown state (i.e. if it is accepting new requests)
467 * @since 2.0
468 */
469 protected boolean executorIsShutdown() {
470 return fExecutor.isShutdown();
471 }
472
473 /**
474 * @return the termination state
475 * @since 2.0
476 */
477 protected boolean executorIsTerminated() {
478 return fExecutor.isTerminated();
479 }
480
481 // ------------------------------------------------------------------------
482 // Signal handlers
483 // ------------------------------------------------------------------------
484
485 /**
486 * Handler for the start synch signal
487 *
488 * @param signal
489 * Incoming signal
490 */
491 @TmfSignalHandler
492 public void startSynch(TmfStartSynchSignal signal) {
493 synchronized (fLock) {
494 fSignalDepth++;
495 }
496 }
497
498 /**
499 * Handler for the end synch signal
500 *
501 * @param signal
502 * Incoming signal
503 */
504 @TmfSignalHandler
505 public void endSynch(TmfEndSynchSignal signal) {
506 synchronized (fLock) {
507 fSignalDepth--;
508 if (fSignalDepth == 0) {
509 fireRequest(false);
510 }
511 }
512 }
513
514 @Override
515 public ITmfEventProvider getParent() {
516 synchronized (fLock) {
517 return fParent;
518 }
519 }
520
521 @Override
522 public void setParent(ITmfEventProvider parent) {
523 if (!(parent instanceof TmfEventProvider)) {
524 throw new IllegalArgumentException();
525 }
526
527 synchronized (fLock) {
528 fParent = (TmfEventProvider) parent;
529 }
530 }
531
532 @Override
533 public List<ITmfEventProvider> getChildren() {
534 synchronized (fChildren) {
535 List<ITmfEventProvider> list = new ArrayList<>();
536 list.addAll(fChildren);
537 return list;
538 }
539 }
540
541 @Override
542 public <T extends ITmfEventProvider> List<T> getChildren(Class<T> clazz) {
543 List<T> list = new ArrayList<>();
544 synchronized (fChildren) {
545 for (TmfEventProvider child : fChildren) {
546 if (clazz.isAssignableFrom(child.getClass())) {
547 list.add(clazz.cast(child));
548 }
549 }
550 }
551 return list;
552 }
553
554 @Override
555 public ITmfEventProvider getChild(String name) {
556 synchronized (fChildren) {
557 for (TmfEventProvider child : fChildren) {
558 if (child.getName().equals(name)) {
559 return child;
560 }
561 }
562 }
563 return null;
564 }
565
566 @SuppressWarnings("null")
567 @Override
568 public ITmfEventProvider getChild(int index) {
569 return fChildren.get(index);
570 }
571
572 @Override
573 public void addChild(ITmfEventProvider child) {
574 if (!(child instanceof TmfEventProvider)) {
575 throw new IllegalArgumentException();
576 }
577 fChildren.add((TmfEventProvider)child);
578 }
579
580 @Override
581 public int getNbChildren() {
582 return fChildren.size();
583 }
584
585 @Override
586 public boolean providesEvent(ITmfEvent event) {
587 if ((event.getTrace() == this)) {
588 return true;
589 }
590 if (fChildren.size() > 0) {
591 synchronized (fLock) {
592 List <TmfEventProvider> children = getChildren(TmfEventProvider.class);
593 for (TmfEventProvider child : children) {
594 if (child.providesEvent(event)) {
595 return true;
596 }
597 }
598 }
599 }
600 return false;
601 }
602
603 // ------------------------------------------------------------------------
604 // Debug code (will also used in tests using reflection)
605 // ------------------------------------------------------------------------
606
607 /**
608 * Gets a list of all pending requests. Debug code.
609 *
610 * @return a list of all pending requests
611 */
612 private List<TmfCoalescedEventRequest> getPendingRequests() {
613 return fPendingCoalescedRequests;
614 }
615
616 /**
617 * Clears all pending requests. Debug code.
618 */
619 private void clearPendingRequests() {
620 fPendingCoalescedRequests.clear();
621 }
622
623 /**
624 * Enables/disables the timer. Debug code.
625 *
626 * @param enabled
627 * the enable flag to set
628 */
629 private void setTimerEnabled(Boolean enabled) {
630 fIsTimerEnabled = enabled;
631 }
632 }
This page took 0.045893 seconds and 6 git commands to generate.