tmf: Make TransientState really re-entrant
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfEventProvider.java
CommitLineData
8c8bf09f 1/*******************************************************************************
8967c8c0 2 * Copyright (c) 2009, 2014 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f 9 * Contributors:
fd3f1eff
AM
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
8967c8c0 13 * Bernd Hufmann - Add timer based coalescing for background requests
8c8bf09f
ASL
14 *******************************************************************************/
15
6c13869b 16package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 17
fd3f1eff 18import java.util.ArrayList;
8967c8c0 19import java.util.Iterator;
fd3f1eff 20import java.util.List;
8967c8c0
BH
21import java.util.Timer;
22import java.util.TimerTask;
fd3f1eff
AM
23import java.util.concurrent.BlockingQueue;
24import java.util.concurrent.LinkedBlockingQueue;
25import java.util.concurrent.SynchronousQueue;
26
5419a136 27import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
fd3f1eff
AM
28import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
29import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
5419a136 30import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest;
fd3f1eff 31import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 32import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
5419a136 33import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
fd3f1eff
AM
34import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest.ExecutionType;
35import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
36import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
37import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
3bd46eef 38import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp;
fd3f1eff 39import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
40
41/**
fd3f1eff
AM
42 * An abstract base class that implements ITmfEventProvider.
43 * <p>
44 * This abstract class implements the housekeeping methods to register/
45 * de-register the event provider and to handle generically the event requests.
46 * </p>
0283f7ff 47 *
8fd82db5 48 * @author Francois Chouinard
c4767854 49 * @since 3.0
8c8bf09f 50 */
fd3f1eff
AM
51public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
52
53 // ------------------------------------------------------------------------
54 // Constants
55 // ------------------------------------------------------------------------
56
c4767854
AM
57 /** Default amount of events per request "chunk"
58 * @since 3.0 */
fd3f1eff
AM
59 public static final int DEFAULT_BLOCK_SIZE = 50000;
60
c4767854
AM
61 /** Default size of the queue
62 * @since 3.0 */
fd3f1eff
AM
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64
8967c8c0
BH
65 /** Delay for coalescing background requests (in milli-seconds) */
66 private static final long DELAY = 1000;
67
fd3f1eff
AM
68 // ------------------------------------------------------------------------
69 // Attributes
70 // ------------------------------------------------------------------------
71
c4767854
AM
72 /** List of coalesced requests
73 * @since 3.0*/
a4524c1b 74 protected final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new ArrayList<>();
fd3f1eff 75
c4767854
AM
76 /** The type of event handled by this provider
77 * @since 3.0*/
fd3f1eff
AM
78 protected Class<? extends ITmfEvent> fType;
79
c4767854
AM
80 /** Queue of events
81 * @since 3.0*/
fd3f1eff
AM
82 protected BlockingQueue<ITmfEvent> fDataQueue;
83
c4767854
AM
84 /** Size of the fDataQueue
85 * @since 3.0*/
fd3f1eff
AM
86 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
87
88 private final TmfRequestExecutor fExecutor;
89
90 private final Object fLock = new Object();
91
92 private int fSignalDepth = 0;
93
94 private int fRequestPendingCounter = 0;
8c8bf09f 95
8967c8c0
BH
96 private final Timer fTimer;
97
98 private boolean fIsTimeout = false;
99
00641a97
FC
100 // ------------------------------------------------------------------------
101 // Constructors
102 // ------------------------------------------------------------------------
103
063f0d27
AM
104 /**
105 * Default constructor
106 */
12c155f5 107 public TmfEventProvider() {
00641a97 108 super();
fd3f1eff 109 fQueueSize = DEFAULT_QUEUE_SIZE;
a4524c1b 110 fDataQueue = new LinkedBlockingQueue<>(fQueueSize);
fd3f1eff 111 fExecutor = new TmfRequestExecutor();
8967c8c0 112 fTimer = new Timer();
12c155f5 113 }
8c8bf09f 114
063f0d27 115 /**
fd3f1eff 116 * Initialize this data provider
063f0d27
AM
117 *
118 * @param name
fd3f1eff 119 * Name of the provider
063f0d27 120 * @param type
fd3f1eff 121 * The type of events that will be handled
063f0d27 122 */
fd3f1eff
AM
123 public void init(String name, Class<? extends ITmfEvent> type) {
124 super.init(name);
125 fType = type;
126 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
127
128 fExecutor.init();
8967c8c0 129
fd3f1eff
AM
130 fSignalDepth = 0;
131
132 TmfProviderManager.register(fType, this);
12c155f5
FC
133 }
134
063f0d27 135 /**
fd3f1eff 136 * Constructor specifying the event type and the queue size.
063f0d27
AM
137 *
138 * @param name
fd3f1eff 139 * Name of the provider
063f0d27 140 * @param type
fd3f1eff 141 * Type of event that will be handled
063f0d27 142 * @param queueSize
fd3f1eff 143 * Size of the event queue
063f0d27 144 */
fd3f1eff
AM
145 protected TmfEventProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
146 this();
147 fQueueSize = queueSize;
148 init(name, type);
12c155f5
FC
149 }
150
063f0d27
AM
151 /**
152 * Copy constructor
153 *
154 * @param other
fd3f1eff 155 * The other object to copy
063f0d27 156 */
6256d8ad 157 public TmfEventProvider(TmfEventProvider other) {
fd3f1eff
AM
158 this();
159 init(other.getName(), other.fType);
160 }
161
162 /**
163 * Standard constructor. Instantiate and initialize at the same time.
164 *
165 * @param name
166 * Name of the provider
167 * @param type
168 * The type of events that will be handled
169 */
170 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
171 this(name, type, DEFAULT_QUEUE_SIZE);
172 }
173
174 @Override
175 public void dispose() {
176 TmfProviderManager.deregister(fType, this);
177 fExecutor.stop();
178 super.dispose();
12c155f5
FC
179 }
180
00641a97 181 // ------------------------------------------------------------------------
fd3f1eff
AM
182 // Accessors
183 // ------------------------------------------------------------------------
184
185 /**
186 * Get the queue size of this provider
187 *
188 * @return The size of the queue
189 */
190 public int getQueueSize() {
191 return fQueueSize;
192 }
193
194 /**
195 * Get the event type this provider handles
196 *
197 * @return The type of ITmfEvent
198 */
0f89d4ba 199 public Class<? extends ITmfEvent> getType() {
fd3f1eff
AM
200 return fType;
201 }
202
203 // ------------------------------------------------------------------------
204 // ITmfRequestHandler
00641a97
FC
205 // ------------------------------------------------------------------------
206
c4767854
AM
207 /**
208 * @since 3.0
209 */
5419a136 210 @Override
fd3f1eff
AM
211 public void sendRequest(final ITmfEventRequest request) {
212 synchronized (fLock) {
8967c8c0
BH
213 if (request.getExecType() == ExecutionType.FOREGROUND) {
214 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
215 coalesceEventRequest(request);
216 } else {
217 dispatchRequest(request);
218 }
219 return;
220 }
221
222 /*
223 * For the first background request in the request pending queue
224 * a timer will be started to allow other background requests to
225 * coalesce.
226 */
227 boolean startTimer = (getNbPendingBackgroundRequests() == 0);
228 coalesceEventRequest(request);
229 if (startTimer) {
230 TimerTask task = new TimerTask() {
231 @Override
232 public void run() {
233 synchronized (fLock) {
234 fIsTimeout = true;
235 fireRequest();
236 }
237 }
238 };
239 fTimer.schedule(task, DELAY);
fd3f1eff
AM
240 }
241 }
242 }
243
244 @Override
245 public void fireRequest() {
246 synchronized (fLock) {
247 if (fRequestPendingCounter > 0) {
248 return;
249 }
8967c8c0 250
fd3f1eff 251 if (fPendingCoalescedRequests.size() > 0) {
8967c8c0
BH
252 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
253 while (iter.hasNext()) {
254 ExecutionType type = (fIsTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
255 ITmfEventRequest request = iter.next();
256 if (type == request.getExecType()) {
257 dispatchRequest(request);
258 iter.remove();
259 }
fd3f1eff 260 }
fd3f1eff 261 }
5419a136 262 }
5419a136
AM
263 }
264
fd3f1eff
AM
265 /**
266 * Increments/decrements the pending requests counters and fires the request
267 * if necessary (counter == 0). Used for coalescing requests across multiple
268 * TmfDataProvider's.
269 *
270 * @param isIncrement
271 * Should we increment (true) or decrement (false) the pending
272 * counter
273 */
5419a136 274 @Override
fd3f1eff
AM
275 public void notifyPendingRequest(boolean isIncrement) {
276 synchronized (fLock) {
277 if (isIncrement) {
8967c8c0 278 fRequestPendingCounter++;
fd3f1eff
AM
279 } else {
280 if (fRequestPendingCounter > 0) {
281 fRequestPendingCounter--;
282 }
283
284 // fire request if all pending requests are received
285 if (fRequestPendingCounter == 0) {
286 fireRequest();
287 }
288 }
289 }
290 }
291
292 // ------------------------------------------------------------------------
293 // Coalescing
294 // ------------------------------------------------------------------------
295
296 /**
297 * Create a new request from an existing one, and add it to the coalesced
298 * requests
299 *
300 * @param request
301 * The request to copy
c4767854 302 * @since 3.0
fd3f1eff 303 */
8967c8c0
BH
304 protected void newCoalescedEventRequest(ITmfEventRequest request) {
305 synchronized (fLock) {
672a642a 306 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
fd3f1eff
AM
307 request.getDataType(),
308 request.getRange(),
309 request.getIndex(),
310 request.getNbRequested(),
311 request.getExecType());
312 coalescedRequest.addRequest(request);
5419a136
AM
313 if (TmfCoreTracer.isRequestTraced()) {
314 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
315 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
316 }
317 fPendingCoalescedRequests.add(coalescedRequest);
8967c8c0 318 }
fd3f1eff
AM
319 }
320
321 /**
322 * Add an existing requests to the list of coalesced ones
323 *
324 * @param request
325 * The request to add to the list
c4767854 326 * @since 3.0
fd3f1eff
AM
327 */
328 protected void coalesceEventRequest(ITmfEventRequest request) {
329 synchronized (fLock) {
330 for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) {
331 if (coalescedRequest.isCompatible(request)) {
332 coalescedRequest.addRequest(request);
333 if (TmfCoreTracer.isRequestTraced()) {
334 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
335 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
336 }
337 return;
338 }
339 }
340 newCoalescedEventRequest(request);
341 }
342 }
343
8967c8c0
BH
344 /**
345 * Gets the number of background requests in pending queue.
346 *
347 * @return the number of background requests in pending queue
348 */
349 private int getNbPendingBackgroundRequests() {
350 int nbBackgroundRequests = 0;
351 synchronized (fLock) {
352 for (ITmfEventRequest request : fPendingCoalescedRequests) {
353 if (request.getExecType() == ExecutionType.BACKGROUND) {
354 nbBackgroundRequests++;
355 }
356 }
357 }
358 return nbBackgroundRequests;
359 }
360
fd3f1eff
AM
361 // ------------------------------------------------------------------------
362 // Request processing
363 // ------------------------------------------------------------------------
364
365 private void dispatchRequest(final ITmfEventRequest request) {
366 if (request.getExecType() == ExecutionType.FOREGROUND) {
367 queueRequest(request);
5419a136 368 } else {
fd3f1eff
AM
369 queueBackgroundRequest(request, true);
370 }
371 }
372
373 /**
374 * Queue a request.
375 *
376 * @param request
377 * The data request
c4767854 378 * @since 3.0
fd3f1eff
AM
379 */
380 protected void queueRequest(final ITmfEventRequest request) {
381
382 if (fExecutor.isShutdown()) {
383 request.cancel();
384 return;
385 }
386
387 TmfEventThread thread = new TmfEventThread(this, request);
388
389 if (TmfCoreTracer.isRequestTraced()) {
390 TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
391 }
392
393 fExecutor.execute(thread);
394 }
395
396 /**
397 * Queue a background request
398 *
399 * @param request
400 * The request
401 * @param indexing
402 * Should we index the chunks
403 * @since 3.0
404 */
405 protected void queueBackgroundRequest(final ITmfEventRequest request, final boolean indexing) {
406 queueRequest(request);
407 }
408
409 /**
410 * Initialize the provider based on the request. The context is provider
411 * specific and will be updated by getNext().
412 *
413 * @param request
414 * The request
415 * @return An application specific context; null if request can't be
416 * serviced
c4767854 417 * @since 3.0
fd3f1eff
AM
418 */
419 public abstract ITmfContext armRequest(ITmfEventRequest request);
420
421 /**
422 * Checks if the data meets the request completion criteria.
423 *
424 * @param request
425 * The request
426 * @param event
427 * The data to verify
428 * @param nbRead
429 * The number of events read so far
430 * @return true if completion criteria is met
c4767854 431 * @since 3.0
fd3f1eff
AM
432 */
433 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
434 boolean requestCompleted = isCompleted2(request, nbRead);
435 if (!requestCompleted) {
436 ITmfTimestamp endTime = request.getRange().getEndTime();
437 return event.getTimestamp().compareTo(endTime, false) > 0;
438 }
439 return requestCompleted;
440 }
441
442 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
443 return request.isCompleted() || nbRead >= request.getNbRequested();
444 }
445
446 // ------------------------------------------------------------------------
447 // Pass-through's to the request executor
448 // ------------------------------------------------------------------------
449
450 /**
451 * @return the shutdown state (i.e. if it is accepting new requests)
452 * @since 2.0
453 */
454 protected boolean executorIsShutdown() {
455 return fExecutor.isShutdown();
456 }
457
458 /**
459 * @return the termination state
460 * @since 2.0
461 */
462 protected boolean executorIsTerminated() {
463 return fExecutor.isTerminated();
464 }
465
466 // ------------------------------------------------------------------------
467 // Signal handlers
468 // ------------------------------------------------------------------------
469
470 /**
471 * Handler for the start synch signal
472 *
473 * @param signal
474 * Incoming signal
475 */
476 @TmfSignalHandler
477 public void startSynch(TmfStartSynchSignal signal) {
478 synchronized (fLock) {
479 fSignalDepth++;
480 }
481 }
482
483 /**
484 * Handler for the end synch signal
485 *
486 * @param signal
487 * Incoming signal
488 */
489 @TmfSignalHandler
490 public void endSynch(TmfEndSynchSignal signal) {
491 synchronized (fLock) {
492 fSignalDepth--;
493 if (fSignalDepth == 0) {
8967c8c0 494 fIsTimeout = false;
fd3f1eff
AM
495 fireRequest();
496 }
5419a136
AM
497 }
498 }
951d134a 499
8c8bf09f 500}
This page took 0.072198 seconds and 5 git commands to generate.