tmf: clean-up event provider package cycle
[deliverable/tracecompass.git] / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / component / TmfEventProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
15
16 package org.eclipse.tracecompass.tmf.core.component;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Timer;
24 import java.util.TimerTask;
25
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.tracecompass.common.core.NonNullUtils;
28 import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
29 import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
30 import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
31 import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
32 import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
33 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
34 import org.eclipse.tracecompass.tmf.core.filter.ITmfFilter;
35 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
36 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
37 import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
38 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
39 import org.eclipse.tracecompass.tmf.core.signal.TmfStartSynchSignal;
40 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
41 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
42
43 /**
44 * An abstract base class that implements ITmfEventProvider.
45 * <p>
46 * This abstract class implements the housekeeping methods to register/
47 * de-register the event provider and to handle generically the event requests.
48 * </p>
49 *
50 * @author Francois Chouinard
51 * @since 3.0
52 */
53 public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider, ITmfFilter {
54
55 // ------------------------------------------------------------------------
56 // Constants
57 // ------------------------------------------------------------------------
58
59 /** Default amount of events per request "chunk"
60 * @since 3.0 */
61 public static final int DEFAULT_BLOCK_SIZE = 50000;
62
63 /** Delay for coalescing background requests (in milli-seconds) */
64 private static final long DELAY = 1000;
65
66 // ------------------------------------------------------------------------
67 // Attributes
68 // ------------------------------------------------------------------------
69
70 /** List of coalesced requests */
71 private final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new LinkedList<>();
72
73 /** The type of event handled by this provider */
74 private Class<? extends ITmfEvent> fType;
75
76 private final TmfRequestExecutor fExecutor;
77
78 private final Object fLock = new Object();
79
80 private int fSignalDepth = 0;
81
82 private int fRequestPendingCounter = 0;
83
84 private Timer fTimer;
85
86 /** Current timer task */
87 @NonNull private TimerTask fCurrentTask = new TimerTask() { @Override public void run() {} };
88
89 private boolean fIsTimerEnabled;
90
91 /**
92 * The parent event provider.
93 */
94 private TmfEventProvider fParent = null;
95 /**
96 * The list if children event provider.
97 */
98 private final List<TmfEventProvider> fChildren = Collections.synchronizedList(new ArrayList<TmfEventProvider>());
99
100 // ------------------------------------------------------------------------
101 // Constructors
102 // ------------------------------------------------------------------------
103
104 /**
105 * Default constructor
106 */
107 public TmfEventProvider() {
108 super();
109 setTimerEnabled(true);
110 fExecutor = new TmfRequestExecutor();
111 }
112
113 /**
114 * Standard constructor. Instantiate and initialize at the same time.
115 *
116 * @param name
117 * Name of the provider
118 * @param type
119 * The type of events that will be handled
120 */
121 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
122 this();
123 init(name, type);
124 }
125
126 /**
127 * Initialize this data provider
128 *
129 * @param name
130 * Name of the provider
131 * @param type
132 * The type of events that will be handled
133 */
134 public void init(String name, Class<? extends ITmfEvent> type) {
135 super.init(name);
136 fType = type;
137 fExecutor.init();
138
139 fSignalDepth = 0;
140
141 synchronized (fLock) {
142 fTimer = new Timer();
143 }
144
145 TmfProviderManager.register(fType, this);
146 }
147
148 @Override
149 public void dispose() {
150 TmfProviderManager.deregister(fType, this);
151 fExecutor.stop();
152 synchronized (fLock) {
153 if (fTimer != null) {
154 fTimer.cancel();
155 }
156 fTimer = null;
157 }
158
159 synchronized (fChildren) {
160 for (TmfEventProvider child : fChildren) {
161 child.dispose();
162 }
163 fChildren.clear();
164 }
165 clearPendingRequests();
166 super.dispose();
167 }
168
169 // ------------------------------------------------------------------------
170 // Accessors
171 // ------------------------------------------------------------------------
172
173 /**
174 * Get the event type this provider handles
175 *
176 * @return The type of ITmfEvent
177 */
178 public Class<? extends ITmfEvent> getType() {
179 return fType;
180 }
181
182 // ------------------------------------------------------------------------
183 // ITmfRequestHandler
184 // ------------------------------------------------------------------------
185
186 /**
187 * @since 3.0
188 */
189 @Override
190 public void sendRequest(final ITmfEventRequest request) {
191 synchronized (fLock) {
192
193 if (TmfCoreTracer.isRequestTraced()) {
194 TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
195 }
196
197 if (request.getProviderFilter() == null) {
198 request.setProviderFilter(this);
199 }
200
201 if (sendWithParent(request)) {
202 return;
203 }
204
205 if (request.getExecType() == ExecutionType.FOREGROUND) {
206 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
207 coalesceEventRequest(request);
208 } else {
209 queueRequest(request);
210 }
211 return;
212 }
213
214 /*
215 * Dispatch request in case timer is not running.
216 */
217 if (fTimer == null) {
218 queueRequest(request);
219 return;
220 }
221
222 coalesceEventRequest(request);
223
224 if (fIsTimerEnabled) {
225 fCurrentTask.cancel();
226 fCurrentTask = new TimerTask() {
227 @Override
228 public void run() {
229 synchronized (fLock) {
230 fireRequest(true);
231 }
232 }
233 };
234 fTimer.schedule(fCurrentTask, DELAY);
235 }
236 }
237 }
238
239 private void fireRequest(boolean isTimeout) {
240 synchronized (fLock) {
241 if (fRequestPendingCounter > 0) {
242 return;
243 }
244
245 if (fPendingCoalescedRequests.size() > 0) {
246 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
247 while (iter.hasNext()) {
248 ExecutionType type = (isTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
249 ITmfEventRequest request = iter.next();
250 if (type == request.getExecType()) {
251 queueRequest(request);
252 iter.remove();
253 }
254 }
255 }
256 }
257 }
258
259 /**
260 * Increments/decrements the pending requests counters and fires the request
261 * if necessary (counter == 0). Used for coalescing requests across multiple
262 * TmfDataProvider's.
263 *
264 * @param isIncrement
265 * Should we increment (true) or decrement (false) the pending
266 * counter
267 */
268 @Override
269 public void notifyPendingRequest(boolean isIncrement) {
270 synchronized (fLock) {
271 if (isIncrement) {
272 fRequestPendingCounter++;
273 } else {
274 if (fRequestPendingCounter > 0) {
275 fRequestPendingCounter--;
276 }
277
278 // fire request if all pending requests are received
279 if (fRequestPendingCounter == 0) {
280 fireRequest(false);
281 fireRequest(true);
282 }
283 }
284 }
285 }
286
287 // ------------------------------------------------------------------------
288 // Coalescing
289 // ------------------------------------------------------------------------
290
291 /**
292 * Create a new request from an existing one, and add it to the coalesced
293 * requests
294 *
295 * @param request
296 * The request to copy
297 * @since 3.0
298 */
299 protected void newCoalescedEventRequest(ITmfEventRequest request) {
300 synchronized (fLock) {
301 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
302 request.getDataType(),
303 request.getRange(),
304 request.getIndex(),
305 request.getNbRequested(),
306 request.getExecType());
307 coalescedRequest.addRequest(request);
308 coalescedRequest.setProviderFilter(this);
309 if (TmfCoreTracer.isRequestTraced()) {
310 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
311 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
312 }
313 coalesceChildrenRequests(coalescedRequest);
314 fPendingCoalescedRequests.add(coalescedRequest);
315 }
316 }
317
318 /**
319 * Add an existing requests to the list of coalesced ones
320 *
321 * @param request
322 * The request to add to the list
323 * @since 3.0
324 */
325 protected void coalesceEventRequest(ITmfEventRequest request) {
326 synchronized (fLock) {
327 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
328 if (coalescedRequest.isCompatible(request)) {
329 coalescedRequest.addRequest(request);
330 if (TmfCoreTracer.isRequestTraced()) {
331 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
332 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
333 }
334 coalesceChildrenRequests(coalescedRequest);
335 return;
336 }
337 }
338 newCoalescedEventRequest(request);
339 }
340 }
341
342 /*
343 * Sends a request with the parent if compatible.
344 */
345 private boolean sendWithParent(final ITmfEventRequest request) {
346 ITmfEventProvider parent = getParent();
347 if (parent instanceof TmfEventProvider) {
348 return ((TmfEventProvider) parent).sendIfCompatible(request);
349 }
350 return false;
351 }
352
353 /*
354 * Sends a request if compatible with a pending coalesced request.
355 */
356 private boolean sendIfCompatible(ITmfEventRequest request) {
357 synchronized (fLock) {
358 for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
359 if (coalescedRequest.isCompatible(request)) {
360 // Send so it can be coalesced with the parent(s)
361 sendRequest(request);
362 return true;
363 }
364 }
365 }
366 return sendWithParent(request);
367 }
368
369 /*
370 * Coalesces children requests with given request if compatible.
371 */
372 private void coalesceChildrenRequests(final TmfCoalescedEventRequest request) {
373 synchronized (fChildren) {
374 for (TmfEventProvider child : fChildren) {
375 child.coalesceCompatibleRequests(request);
376 }
377 }
378 }
379
380
381 /*
382 * Coalesces all pending requests that are compatible with coalesced request.
383 */
384 private void coalesceCompatibleRequests(TmfCoalescedEventRequest request) {
385 Iterator<TmfCoalescedEventRequest> iter = getPendingRequests().iterator();
386 while (iter.hasNext()) {
387 TmfCoalescedEventRequest pendingRequest = iter.next();
388 if (request.isCompatible(pendingRequest)) {
389 request.addRequest(pendingRequest);
390 if (TmfCoreTracer.isRequestTraced()) {
391 TmfCoreTracer.traceRequest(pendingRequest.getRequestId(), "COALESCED with " + request.getRequestId()); //$NON-NLS-1$
392 TmfCoreTracer.traceRequest(request.getRequestId(), "now contains " + request.getSubRequestIds()); //$NON-NLS-1$
393 }
394 iter.remove();
395 }
396 }
397 }
398
399 // ------------------------------------------------------------------------
400 // Request processing
401 // ------------------------------------------------------------------------
402
403 /**
404 * Queue a request.
405 *
406 * @param request
407 * The data request
408 * @since 3.0
409 */
410 protected void queueRequest(final ITmfEventRequest request) {
411
412 if (fExecutor.isShutdown()) {
413 request.cancel();
414 return;
415 }
416
417 TmfEventThread thread = new TmfEventThread(this, request);
418
419 if (TmfCoreTracer.isRequestTraced()) {
420 TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
421 }
422
423 fExecutor.execute(thread);
424 }
425
426 /**
427 * Initialize the provider based on the request. The context is provider
428 * specific and will be updated by getNext().
429 *
430 * @param request
431 * The request
432 * @return An application specific context; null if request can't be
433 * serviced
434 * @since 3.0
435 */
436 public abstract ITmfContext armRequest(ITmfEventRequest request);
437
438 /**
439 * Checks if the data meets the request completion criteria.
440 *
441 * @param request
442 * The request
443 * @param event
444 * The data to verify
445 * @param nbRead
446 * The number of events read so far
447 * @return true if completion criteria is met
448 * @since 3.0
449 */
450 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
451 boolean requestCompleted = isCompleted2(request, nbRead);
452 if (!requestCompleted) {
453 ITmfTimestamp endTime = request.getRange().getEndTime();
454 return event.getTimestamp().compareTo(endTime) > 0;
455 }
456 return requestCompleted;
457 }
458
459 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
460 return request.isCompleted() || nbRead >= request.getNbRequested();
461 }
462
463 // ------------------------------------------------------------------------
464 // Pass-through's to the request executor
465 // ------------------------------------------------------------------------
466
467 /**
468 * @return the shutdown state (i.e. if it is accepting new requests)
469 * @since 2.0
470 */
471 protected boolean executorIsShutdown() {
472 return fExecutor.isShutdown();
473 }
474
475 /**
476 * @return the termination state
477 * @since 2.0
478 */
479 protected boolean executorIsTerminated() {
480 return fExecutor.isTerminated();
481 }
482
483 // ------------------------------------------------------------------------
484 // Signal handlers
485 // ------------------------------------------------------------------------
486
487 /**
488 * Handler for the start synch signal
489 *
490 * @param signal
491 * Incoming signal
492 */
493 @TmfSignalHandler
494 public void startSynch(TmfStartSynchSignal signal) {
495 synchronized (fLock) {
496 fSignalDepth++;
497 }
498 }
499
500 /**
501 * Handler for the end synch signal
502 *
503 * @param signal
504 * Incoming signal
505 */
506 @TmfSignalHandler
507 public void endSynch(TmfEndSynchSignal signal) {
508 synchronized (fLock) {
509 fSignalDepth--;
510 if (fSignalDepth == 0) {
511 fireRequest(false);
512 }
513 }
514 }
515
516 @Override
517 public ITmfEventProvider getParent() {
518 synchronized (fLock) {
519 return fParent;
520 }
521 }
522
523 @Override
524 public void setParent(ITmfEventProvider parent) {
525 if (!(parent instanceof TmfEventProvider)) {
526 throw new IllegalArgumentException();
527 }
528
529 synchronized (fLock) {
530 fParent = (TmfEventProvider) parent;
531 }
532 }
533
534 @Override
535 public List<ITmfEventProvider> getChildren() {
536 synchronized (fChildren) {
537 List<ITmfEventProvider> list = new ArrayList<>();
538 list.addAll(fChildren);
539 return list;
540 }
541 }
542
543 @Override
544 public <T extends ITmfEventProvider> List<T> getChildren(Class<T> clazz) {
545 List<T> list = new ArrayList<>();
546 synchronized (fChildren) {
547 for (TmfEventProvider child : fChildren) {
548 if (clazz.isAssignableFrom(child.getClass())) {
549 list.add(clazz.cast(child));
550 }
551 }
552 }
553 return list;
554 }
555
556 @Override
557 public ITmfEventProvider getChild(String name) {
558 synchronized (fChildren) {
559 for (TmfEventProvider child : fChildren) {
560 if (child.getName().equals(name)) {
561 return child;
562 }
563 }
564 }
565 return null;
566 }
567
568 @Override
569 public ITmfEventProvider getChild(int index) {
570 return NonNullUtils.checkNotNull(fChildren.get(index));
571 }
572
573 @Override
574 public void addChild(ITmfEventProvider child) {
575 if (!(child instanceof TmfEventProvider)) {
576 throw new IllegalArgumentException();
577 }
578 child.setParent(this);
579 fChildren.add((TmfEventProvider)child);
580 }
581
582 @Override
583 public int getNbChildren() {
584 return fChildren.size();
585 }
586
587 /**
588 * Returns true if an event was provided by this event provider or one of
589 * its children event providers else false.
590 *
591 * @param event
592 * the event to check
593 * @return <code>true</code> if event was provided by this provider or one
594 * of its children else <code>false</code>
595 */
596 @Override
597 public boolean matches(ITmfEvent event) {
598 if ((event.getTrace() == this)) {
599 return true;
600 }
601 if (fChildren.size() > 0) {
602 synchronized (fLock) {
603 List <TmfEventProvider> children = getChildren(TmfEventProvider.class);
604 for (TmfEventProvider child : children) {
605 if (child.matches(event)) {
606 return true;
607 }
608 }
609 }
610 }
611 return false;
612 }
613
614 // ------------------------------------------------------------------------
615 // Debug code (will also used in tests using reflection)
616 // ------------------------------------------------------------------------
617
618 /**
619 * Gets a list of all pending requests. Debug code.
620 *
621 * @return a list of all pending requests
622 */
623 private List<TmfCoalescedEventRequest> getPendingRequests() {
624 return fPendingCoalescedRequests;
625 }
626
627 /**
628 * Clears all pending requests. Debug code.
629 */
630 private void clearPendingRequests() {
631 fPendingCoalescedRequests.clear();
632 }
633
634 /**
635 * Enables/disables the timer. Debug code.
636 *
637 * @param enabled
638 * the enable flag to set
639 */
640 private void setTimerEnabled(Boolean enabled) {
641 fIsTimerEnabled = enabled;
642 }
643 }
This page took 0.048258 seconds and 6 git commands to generate.