tmf: only dispatch events to relevant event providers
[deliverable/tracecompass.git] / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / component / TmfEventProvider.java
CommitLineData
8c8bf09f 1/*******************************************************************************
8967c8c0 2 * Copyright (c) 2009, 2014 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f 9 * Contributors:
fd3f1eff
AM
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
8967c8c0 13 * Bernd Hufmann - Add timer based coalescing for background requests
8c8bf09f
ASL
14 *******************************************************************************/
15
2bdf0193 16package org.eclipse.tracecompass.tmf.core.component;
8c8bf09f 17
d77f31da
BH
18import java.util.ArrayList;
19import java.util.Collections;
8967c8c0 20import java.util.Iterator;
f45257df 21import java.util.LinkedList;
fd3f1eff 22import java.util.List;
8967c8c0
BH
23import java.util.Timer;
24import java.util.TimerTask;
fd3f1eff 25
6ae0ee68 26import org.eclipse.jdt.annotation.NonNull;
2bdf0193
AM
27import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
28import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
29import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
30import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
31import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
32import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
33import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
34import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
35import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
36import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
37import org.eclipse.tracecompass.tmf.core.signal.TmfStartSynchSignal;
38import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
39import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
40
41/**
fd3f1eff
AM
42 * An abstract base class that implements ITmfEventProvider.
43 * <p>
44 * This abstract class implements the housekeeping methods to register/
45 * de-register the event provider and to handle generically the event requests.
46 * </p>
0283f7ff 47 *
8fd82db5 48 * @author Francois Chouinard
c4767854 49 * @since 3.0
8c8bf09f 50 */
fd3f1eff
AM
51public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
52
53 // ------------------------------------------------------------------------
54 // Constants
55 // ------------------------------------------------------------------------
56
c4767854
AM
57 /** Default amount of events per request "chunk"
58 * @since 3.0 */
fd3f1eff
AM
59 public static final int DEFAULT_BLOCK_SIZE = 50000;
60
8967c8c0
BH
61 /** Delay for coalescing background requests (in milli-seconds) */
62 private static final long DELAY = 1000;
63
fd3f1eff
AM
64 // ------------------------------------------------------------------------
65 // Attributes
66 // ------------------------------------------------------------------------
67
f45257df
AM
68 /** List of coalesced requests */
69 private final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new LinkedList<>();
fd3f1eff 70
f45257df
AM
71 /** The type of event handled by this provider */
72 private Class<? extends ITmfEvent> fType;
fd3f1eff
AM
73
74 private final TmfRequestExecutor fExecutor;
75
76 private final Object fLock = new Object();
77
78 private int fSignalDepth = 0;
79
80 private int fRequestPendingCounter = 0;
8c8bf09f 81
506219d0 82 private Timer fTimer;
8967c8c0 83
6ae0ee68
BH
84 /** Current timer task */
85 @NonNull private TimerTask fCurrentTask = new TimerTask() { @Override public void run() {} };
86
2b5c3b7e 87 private boolean fIsTimerEnabled;
8967c8c0 88
d77f31da
BH
89 /**
90 * The parent event provider.
91 */
92 private TmfEventProvider fParent = null;
93 /**
94 * The list if children event provider.
95 */
96 private final List<TmfEventProvider> fChildren = Collections.synchronizedList(new ArrayList<TmfEventProvider>());
97
00641a97
FC
98 // ------------------------------------------------------------------------
99 // Constructors
100 // ------------------------------------------------------------------------
101
063f0d27
AM
102 /**
103 * Default constructor
104 */
12c155f5 105 public TmfEventProvider() {
00641a97 106 super();
2b5c3b7e 107 setTimerEnabled(true);
fd3f1eff 108 fExecutor = new TmfRequestExecutor();
12c155f5 109 }
8c8bf09f 110
063f0d27 111 /**
f45257df 112 * Standard constructor. Instantiate and initialize at the same time.
063f0d27
AM
113 *
114 * @param name
fd3f1eff 115 * Name of the provider
063f0d27 116 * @param type
fd3f1eff 117 * The type of events that will be handled
063f0d27 118 */
f45257df 119 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
fd3f1eff 120 this();
fd3f1eff 121 init(name, type);
12c155f5
FC
122 }
123
063f0d27 124 /**
f45257df 125 * Initialize this data provider
fd3f1eff
AM
126 *
127 * @param name
128 * Name of the provider
129 * @param type
130 * The type of events that will be handled
131 */
f45257df
AM
132 public void init(String name, Class<? extends ITmfEvent> type) {
133 super.init(name);
134 fType = type;
135 fExecutor.init();
136
137 fSignalDepth = 0;
506219d0
BH
138
139 synchronized (fLock) {
140 fTimer = new Timer();
141 }
142
f45257df 143 TmfProviderManager.register(fType, this);
fd3f1eff
AM
144 }
145
146 @Override
147 public void dispose() {
148 TmfProviderManager.deregister(fType, this);
149 fExecutor.stop();
506219d0
BH
150 synchronized (fLock) {
151 if (fTimer != null) {
152 fTimer.cancel();
153 }
154 fTimer = null;
155 }
d77f31da
BH
156
157 synchronized (fChildren) {
158 for (TmfEventProvider child : fChildren) {
159 child.dispose();
160 }
161 fChildren.clear();
162 }
2b5c3b7e 163 clearPendingRequests();
fd3f1eff 164 super.dispose();
12c155f5
FC
165 }
166
00641a97 167 // ------------------------------------------------------------------------
fd3f1eff
AM
168 // Accessors
169 // ------------------------------------------------------------------------
170
fd3f1eff
AM
171 /**
172 * Get the event type this provider handles
173 *
174 * @return The type of ITmfEvent
175 */
0f89d4ba 176 public Class<? extends ITmfEvent> getType() {
fd3f1eff
AM
177 return fType;
178 }
179
180 // ------------------------------------------------------------------------
181 // ITmfRequestHandler
00641a97
FC
182 // ------------------------------------------------------------------------
183
c4767854
AM
184 /**
185 * @since 3.0
186 */
5419a136 187 @Override
fd3f1eff
AM
188 public void sendRequest(final ITmfEventRequest request) {
189 synchronized (fLock) {
5a597798
GB
190
191 if (TmfCoreTracer.isRequestTraced()) {
192 TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
193 }
194
6badfac0
BH
195 if (request.getEventProvider() == null) {
196 request.setEventProvider(this);
197 }
198
6ae0ee68
BH
199 if (sendWithParent(request)) {
200 return;
201 }
202
8967c8c0
BH
203 if (request.getExecType() == ExecutionType.FOREGROUND) {
204 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
205 coalesceEventRequest(request);
206 } else {
f45257df 207 queueRequest(request);
8967c8c0
BH
208 }
209 return;
210 }
211
506219d0
BH
212 /*
213 * Dispatch request in case timer is not running.
214 */
215 if (fTimer == null) {
216 queueRequest(request);
217 return;
218 }
219
8967c8c0 220 coalesceEventRequest(request);
6ae0ee68 221
2b5c3b7e
BH
222 if (fIsTimerEnabled) {
223 fCurrentTask.cancel();
224 fCurrentTask = new TimerTask() {
225 @Override
226 public void run() {
227 synchronized (fLock) {
228 fireRequest(true);
229 }
8967c8c0 230 }
2b5c3b7e
BH
231 };
232 fTimer.schedule(fCurrentTask, DELAY);
233 }
fd3f1eff
AM
234 }
235 }
236
2b5c3b7e 237 private void fireRequest(boolean isTimeout) {
fd3f1eff
AM
238 synchronized (fLock) {
239 if (fRequestPendingCounter > 0) {
240 return;
241 }
8967c8c0 242
fd3f1eff 243 if (fPendingCoalescedRequests.size() > 0) {
8967c8c0
BH
244 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
245 while (iter.hasNext()) {
2b5c3b7e 246 ExecutionType type = (isTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
8967c8c0
BH
247 ITmfEventRequest request = iter.next();
248 if (type == request.getExecType()) {
f45257df 249 queueRequest(request);
8967c8c0
BH
250 iter.remove();
251 }
fd3f1eff 252 }
fd3f1eff 253 }
5419a136 254 }
5419a136
AM
255 }
256
fd3f1eff
AM
257 /**
258 * Increments/decrements the pending requests counters and fires the request
259 * if necessary (counter == 0). Used for coalescing requests across multiple
260 * TmfDataProvider's.
261 *
262 * @param isIncrement
263 * Should we increment (true) or decrement (false) the pending
264 * counter
265 */
5419a136 266 @Override
fd3f1eff
AM
267 public void notifyPendingRequest(boolean isIncrement) {
268 synchronized (fLock) {
269 if (isIncrement) {
8967c8c0 270 fRequestPendingCounter++;
fd3f1eff
AM
271 } else {
272 if (fRequestPendingCounter > 0) {
273 fRequestPendingCounter--;
274 }
275
276 // fire request if all pending requests are received
277 if (fRequestPendingCounter == 0) {
2b5c3b7e 278 fireRequest(false);
6badfac0 279 fireRequest(true);
fd3f1eff
AM
280 }
281 }
282 }
283 }
284
285 // ------------------------------------------------------------------------
286 // Coalescing
287 // ------------------------------------------------------------------------
288
289 /**
290 * Create a new request from an existing one, and add it to the coalesced
291 * requests
292 *
293 * @param request
294 * The request to copy
c4767854 295 * @since 3.0
fd3f1eff 296 */
8967c8c0
BH
297 protected void newCoalescedEventRequest(ITmfEventRequest request) {
298 synchronized (fLock) {
672a642a 299 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
fd3f1eff
AM
300 request.getDataType(),
301 request.getRange(),
302 request.getIndex(),
303 request.getNbRequested(),
304 request.getExecType());
305 coalescedRequest.addRequest(request);
6badfac0 306 coalescedRequest.setEventProvider(this);
5419a136 307 if (TmfCoreTracer.isRequestTraced()) {
8b56808c
GB
308 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
309 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
5419a136 310 }
6ae0ee68 311 coalesceChildrenRequests(coalescedRequest);
5419a136 312 fPendingCoalescedRequests.add(coalescedRequest);
8967c8c0 313 }
fd3f1eff
AM
314 }
315
316 /**
317 * Add an existing requests to the list of coalesced ones
318 *
319 * @param request
320 * The request to add to the list
c4767854 321 * @since 3.0
fd3f1eff
AM
322 */
323 protected void coalesceEventRequest(ITmfEventRequest request) {
324 synchronized (fLock) {
2b5c3b7e 325 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
fd3f1eff
AM
326 if (coalescedRequest.isCompatible(request)) {
327 coalescedRequest.addRequest(request);
328 if (TmfCoreTracer.isRequestTraced()) {
8b56808c
GB
329 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
330 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
fd3f1eff 331 }
6ae0ee68 332 coalesceChildrenRequests(coalescedRequest);
fd3f1eff
AM
333 return;
334 }
335 }
336 newCoalescedEventRequest(request);
337 }
338 }
339
6ae0ee68
BH
340 /*
341 * Sends a request with the parent if compatible.
342 */
343 private boolean sendWithParent(final ITmfEventRequest request) {
344 ITmfEventProvider parent = getParent();
345 if (parent instanceof TmfEventProvider) {
346 return ((TmfEventProvider) parent).sendIfCompatible(request);
347 }
348 return false;
349 }
350
351 /*
352 * Sends a request if compatible with a pending coalesced request.
8967c8c0 353 */
6ae0ee68 354 private boolean sendIfCompatible(ITmfEventRequest request) {
8967c8c0 355 synchronized (fLock) {
2b5c3b7e 356 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
6ae0ee68
BH
357 if (coalescedRequest.isCompatible(request)) {
358 // Send so it can be coalesced with the parent(s)
359 sendRequest(request);
360 return true;
361 }
362 }
363 }
364 return sendWithParent(request);
365 }
366
367 /*
368 * Coalesces children requests with given request if compatible.
369 */
370 private void coalesceChildrenRequests(final TmfCoalescedEventRequest request) {
371 synchronized (fChildren) {
372 for (TmfEventProvider child : fChildren) {
373 child.coalesceCompatibleRequests(request);
374 }
375 }
376 }
377
378
379 /*
380 * Coalesces all pending requests that are compatible with coalesced request.
381 */
382 private void coalesceCompatibleRequests(TmfCoalescedEventRequest request) {
2b5c3b7e 383 Iterator<TmfCoalescedEventRequest> iter = getPendingRequests().iterator();
6ae0ee68
BH
384 while (iter.hasNext()) {
385 TmfCoalescedEventRequest pendingRequest = iter.next();
386 if (request.isCompatible(pendingRequest)) {
387 request.addRequest(pendingRequest);
388 if (TmfCoreTracer.isRequestTraced()) {
389 TmfCoreTracer.traceRequest(pendingRequest.getRequestId(), "COALESCED with " + request.getRequestId()); //$NON-NLS-1$
390 TmfCoreTracer.traceRequest(request.getRequestId(), "now contains " + request.getSubRequestIds()); //$NON-NLS-1$
8967c8c0 391 }
6ae0ee68 392 iter.remove();
8967c8c0
BH
393 }
394 }
8967c8c0
BH
395 }
396
fd3f1eff
AM
397 // ------------------------------------------------------------------------
398 // Request processing
399 // ------------------------------------------------------------------------
400
fd3f1eff
AM
401 /**
402 * Queue a request.
403 *
404 * @param request
405 * The data request
c4767854 406 * @since 3.0
fd3f1eff
AM
407 */
408 protected void queueRequest(final ITmfEventRequest request) {
409
410 if (fExecutor.isShutdown()) {
411 request.cancel();
412 return;
413 }
414
415 TmfEventThread thread = new TmfEventThread(this, request);
416
417 if (TmfCoreTracer.isRequestTraced()) {
8b56808c 418 TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
fd3f1eff
AM
419 }
420
421 fExecutor.execute(thread);
422 }
423
fd3f1eff
AM
424 /**
425 * Initialize the provider based on the request. The context is provider
426 * specific and will be updated by getNext().
427 *
428 * @param request
429 * The request
430 * @return An application specific context; null if request can't be
431 * serviced
c4767854 432 * @since 3.0
fd3f1eff
AM
433 */
434 public abstract ITmfContext armRequest(ITmfEventRequest request);
435
436 /**
437 * Checks if the data meets the request completion criteria.
438 *
439 * @param request
440 * The request
441 * @param event
442 * The data to verify
443 * @param nbRead
444 * The number of events read so far
445 * @return true if completion criteria is met
c4767854 446 * @since 3.0
fd3f1eff
AM
447 */
448 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
449 boolean requestCompleted = isCompleted2(request, nbRead);
450 if (!requestCompleted) {
451 ITmfTimestamp endTime = request.getRange().getEndTime();
065cc19b 452 return event.getTimestamp().compareTo(endTime) > 0;
fd3f1eff
AM
453 }
454 return requestCompleted;
455 }
456
457 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
458 return request.isCompleted() || nbRead >= request.getNbRequested();
459 }
460
461 // ------------------------------------------------------------------------
462 // Pass-through's to the request executor
463 // ------------------------------------------------------------------------
464
465 /**
466 * @return the shutdown state (i.e. if it is accepting new requests)
467 * @since 2.0
468 */
469 protected boolean executorIsShutdown() {
470 return fExecutor.isShutdown();
471 }
472
473 /**
474 * @return the termination state
475 * @since 2.0
476 */
477 protected boolean executorIsTerminated() {
478 return fExecutor.isTerminated();
479 }
480
481 // ------------------------------------------------------------------------
482 // Signal handlers
483 // ------------------------------------------------------------------------
484
485 /**
486 * Handler for the start synch signal
487 *
488 * @param signal
489 * Incoming signal
490 */
491 @TmfSignalHandler
492 public void startSynch(TmfStartSynchSignal signal) {
493 synchronized (fLock) {
494 fSignalDepth++;
495 }
496 }
497
498 /**
499 * Handler for the end synch signal
500 *
501 * @param signal
502 * Incoming signal
503 */
504 @TmfSignalHandler
505 public void endSynch(TmfEndSynchSignal signal) {
506 synchronized (fLock) {
507 fSignalDepth--;
508 if (fSignalDepth == 0) {
2b5c3b7e 509 fireRequest(false);
fd3f1eff 510 }
5419a136
AM
511 }
512 }
951d134a 513
d77f31da
BH
514 @Override
515 public ITmfEventProvider getParent() {
516 synchronized (fLock) {
517 return fParent;
518 }
519 }
520
521 @Override
522 public void setParent(ITmfEventProvider parent) {
523 if (!(parent instanceof TmfEventProvider)) {
524 throw new IllegalArgumentException();
525 }
526
527 synchronized (fLock) {
528 fParent = (TmfEventProvider) parent;
529 }
530 }
531
532 @Override
533 public List<ITmfEventProvider> getChildren() {
534 synchronized (fChildren) {
535 List<ITmfEventProvider> list = new ArrayList<>();
536 list.addAll(fChildren);
537 return list;
538 }
539 }
540
541 @Override
542 public <T extends ITmfEventProvider> List<T> getChildren(Class<T> clazz) {
543 List<T> list = new ArrayList<>();
544 synchronized (fChildren) {
545 for (TmfEventProvider child : fChildren) {
546 if (clazz.isAssignableFrom(child.getClass())) {
547 list.add(clazz.cast(child));
548 }
549 }
550 }
551 return list;
552 }
553
554 @Override
555 public ITmfEventProvider getChild(String name) {
556 synchronized (fChildren) {
557 for (TmfEventProvider child : fChildren) {
558 if (child.getName().equals(name)) {
559 return child;
560 }
561 }
562 }
563 return null;
564 }
565
566 @SuppressWarnings("null")
567 @Override
568 public ITmfEventProvider getChild(int index) {
569 return fChildren.get(index);
570 }
571
572 @Override
573 public void addChild(ITmfEventProvider child) {
574 if (!(child instanceof TmfEventProvider)) {
575 throw new IllegalArgumentException();
576 }
577 fChildren.add((TmfEventProvider)child);
578 }
579
580 @Override
581 public int getNbChildren() {
582 return fChildren.size();
583 }
2b5c3b7e 584
6badfac0
BH
585 @Override
586 public boolean providesEvent(ITmfEvent event) {
587 if ((event.getTrace() == this)) {
588 return true;
589 }
590 if (fChildren.size() > 0) {
591 synchronized (fLock) {
592 List <TmfEventProvider> children = getChildren(TmfEventProvider.class);
593 for (TmfEventProvider child : children) {
594 if (child.providesEvent(event)) {
595 return true;
596 }
597 }
598 }
599 }
600 return false;
601 }
602
2b5c3b7e
BH
603 // ------------------------------------------------------------------------
604 // Debug code (will also used in tests using reflection)
605 // ------------------------------------------------------------------------
606
607 /**
608 * Gets a list of all pending requests. Debug code.
609 *
610 * @return a list of all pending requests
611 */
612 private List<TmfCoalescedEventRequest> getPendingRequests() {
613 return fPendingCoalescedRequests;
614 }
615
616 /**
617 * Clears all pending requests. Debug code.
618 */
619 private void clearPendingRequests() {
620 fPendingCoalescedRequests.clear();
621 }
622
623 /**
624 * Enables/disables the timer. Debug code.
625 *
626 * @param enabled
627 * the enable flag to set
628 */
629 private void setTimerEnabled(Boolean enabled) {
630 fIsTimerEnabled = enabled;
631 }
8c8bf09f 632}
This page took 0.101217 seconds and 5 git commands to generate.