TMF: Add a request log message when a request is sent to a provider
[deliverable/tracecompass.git] / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / component / TmfEventProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
15
16 package org.eclipse.tracecompass.tmf.core.component;
17
18 import java.util.Iterator;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.Timer;
22 import java.util.TimerTask;
23
24 import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
25 import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
26 import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
27 import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
28 import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
29 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
30 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
31 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
32 import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
33 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
34 import org.eclipse.tracecompass.tmf.core.signal.TmfStartSynchSignal;
35 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
36 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
37
38 /**
39 * An abstract base class that implements ITmfEventProvider.
40 * <p>
41 * This abstract class implements the housekeeping methods to register/
42 * de-register the event provider and to handle generically the event requests.
43 * </p>
44 *
45 * @author Francois Chouinard
46 * @since 3.0
47 */
48 public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
49
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
53
54 /** Default amount of events per request "chunk"
55 * @since 3.0 */
56 public static final int DEFAULT_BLOCK_SIZE = 50000;
57
58 /** Delay for coalescing background requests (in milli-seconds) */
59 private static final long DELAY = 1000;
60
61 // ------------------------------------------------------------------------
62 // Attributes
63 // ------------------------------------------------------------------------
64
65 /** List of coalesced requests */
66 private final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new LinkedList<>();
67
68 /** The type of event handled by this provider */
69 private Class<? extends ITmfEvent> fType;
70
71 private final TmfRequestExecutor fExecutor;
72
73 private final Object fLock = new Object();
74
75 private int fSignalDepth = 0;
76
77 private int fRequestPendingCounter = 0;
78
79 private Timer fTimer;
80
81 private boolean fIsTimeout = false;
82
83 // ------------------------------------------------------------------------
84 // Constructors
85 // ------------------------------------------------------------------------
86
87 /**
88 * Default constructor
89 */
90 public TmfEventProvider() {
91 super();
92 fExecutor = new TmfRequestExecutor();
93 }
94
95 /**
96 * Standard constructor. Instantiate and initialize at the same time.
97 *
98 * @param name
99 * Name of the provider
100 * @param type
101 * The type of events that will be handled
102 */
103 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
104 this();
105 init(name, type);
106 }
107
108 /**
109 * Initialize this data provider
110 *
111 * @param name
112 * Name of the provider
113 * @param type
114 * The type of events that will be handled
115 */
116 public void init(String name, Class<? extends ITmfEvent> type) {
117 super.init(name);
118 fType = type;
119 fExecutor.init();
120
121 fSignalDepth = 0;
122
123 synchronized (fLock) {
124 fTimer = new Timer();
125 }
126
127 TmfProviderManager.register(fType, this);
128 }
129
130 @Override
131 public void dispose() {
132 TmfProviderManager.deregister(fType, this);
133 fExecutor.stop();
134 synchronized (fLock) {
135 if (fTimer != null) {
136 fTimer.cancel();
137 }
138 fTimer = null;
139 }
140 super.dispose();
141 }
142
143 // ------------------------------------------------------------------------
144 // Accessors
145 // ------------------------------------------------------------------------
146
147 /**
148 * Get the event type this provider handles
149 *
150 * @return The type of ITmfEvent
151 */
152 public Class<? extends ITmfEvent> getType() {
153 return fType;
154 }
155
156 // ------------------------------------------------------------------------
157 // ITmfRequestHandler
158 // ------------------------------------------------------------------------
159
160 /**
161 * @since 3.0
162 */
163 @Override
164 public void sendRequest(final ITmfEventRequest request) {
165 synchronized (fLock) {
166
167 if (TmfCoreTracer.isRequestTraced()) {
168 TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
169 }
170
171 if (request.getExecType() == ExecutionType.FOREGROUND) {
172 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
173 coalesceEventRequest(request);
174 } else {
175 queueRequest(request);
176 }
177 return;
178 }
179
180 /*
181 * Dispatch request in case timer is not running.
182 */
183 if (fTimer == null) {
184 queueRequest(request);
185 return;
186 }
187
188 /*
189 * For the first background request in the request pending queue
190 * a timer will be started to allow other background requests to
191 * coalesce.
192 */
193 boolean startTimer = (getNbPendingBackgroundRequests() == 0);
194 coalesceEventRequest(request);
195 if (startTimer) {
196 TimerTask task = new TimerTask() {
197 @Override
198 public void run() {
199 synchronized (fLock) {
200 fIsTimeout = true;
201 fireRequest();
202 }
203 }
204 };
205 fTimer.schedule(task, DELAY);
206 }
207 }
208 }
209
210 private void fireRequest() {
211 synchronized (fLock) {
212 if (fRequestPendingCounter > 0) {
213 return;
214 }
215
216 if (fPendingCoalescedRequests.size() > 0) {
217 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
218 while (iter.hasNext()) {
219 ExecutionType type = (fIsTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
220 ITmfEventRequest request = iter.next();
221 if (type == request.getExecType()) {
222 queueRequest(request);
223 iter.remove();
224 }
225 }
226 }
227 }
228 }
229
230 /**
231 * Increments/decrements the pending requests counters and fires the request
232 * if necessary (counter == 0). Used for coalescing requests across multiple
233 * TmfDataProvider's.
234 *
235 * @param isIncrement
236 * Should we increment (true) or decrement (false) the pending
237 * counter
238 */
239 @Override
240 public void notifyPendingRequest(boolean isIncrement) {
241 synchronized (fLock) {
242 if (isIncrement) {
243 fRequestPendingCounter++;
244 } else {
245 if (fRequestPendingCounter > 0) {
246 fRequestPendingCounter--;
247 }
248
249 // fire request if all pending requests are received
250 if (fRequestPendingCounter == 0) {
251 fireRequest();
252 }
253 }
254 }
255 }
256
257 // ------------------------------------------------------------------------
258 // Coalescing
259 // ------------------------------------------------------------------------
260
261 /**
262 * Create a new request from an existing one, and add it to the coalesced
263 * requests
264 *
265 * @param request
266 * The request to copy
267 * @since 3.0
268 */
269 protected void newCoalescedEventRequest(ITmfEventRequest request) {
270 synchronized (fLock) {
271 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
272 request.getDataType(),
273 request.getRange(),
274 request.getIndex(),
275 request.getNbRequested(),
276 request.getExecType());
277 coalescedRequest.addRequest(request);
278 if (TmfCoreTracer.isRequestTraced()) {
279 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
280 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
281 }
282 fPendingCoalescedRequests.add(coalescedRequest);
283 }
284 }
285
286 /**
287 * Add an existing requests to the list of coalesced ones
288 *
289 * @param request
290 * The request to add to the list
291 * @since 3.0
292 */
293 protected void coalesceEventRequest(ITmfEventRequest request) {
294 synchronized (fLock) {
295 for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) {
296 if (coalescedRequest.isCompatible(request)) {
297 coalescedRequest.addRequest(request);
298 if (TmfCoreTracer.isRequestTraced()) {
299 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
300 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
301 }
302 return;
303 }
304 }
305 newCoalescedEventRequest(request);
306 }
307 }
308
309 /**
310 * Gets the number of background requests in pending queue.
311 *
312 * @return the number of background requests in pending queue
313 */
314 private int getNbPendingBackgroundRequests() {
315 int nbBackgroundRequests = 0;
316 synchronized (fLock) {
317 for (ITmfEventRequest request : fPendingCoalescedRequests) {
318 if (request.getExecType() == ExecutionType.BACKGROUND) {
319 nbBackgroundRequests++;
320 }
321 }
322 }
323 return nbBackgroundRequests;
324 }
325
326 // ------------------------------------------------------------------------
327 // Request processing
328 // ------------------------------------------------------------------------
329
330 /**
331 * Queue a request.
332 *
333 * @param request
334 * The data request
335 * @since 3.0
336 */
337 protected void queueRequest(final ITmfEventRequest request) {
338
339 if (fExecutor.isShutdown()) {
340 request.cancel();
341 return;
342 }
343
344 TmfEventThread thread = new TmfEventThread(this, request);
345
346 if (TmfCoreTracer.isRequestTraced()) {
347 TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
348 }
349
350 fExecutor.execute(thread);
351 }
352
353 /**
354 * Initialize the provider based on the request. The context is provider
355 * specific and will be updated by getNext().
356 *
357 * @param request
358 * The request
359 * @return An application specific context; null if request can't be
360 * serviced
361 * @since 3.0
362 */
363 public abstract ITmfContext armRequest(ITmfEventRequest request);
364
365 /**
366 * Checks if the data meets the request completion criteria.
367 *
368 * @param request
369 * The request
370 * @param event
371 * The data to verify
372 * @param nbRead
373 * The number of events read so far
374 * @return true if completion criteria is met
375 * @since 3.0
376 */
377 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
378 boolean requestCompleted = isCompleted2(request, nbRead);
379 if (!requestCompleted) {
380 ITmfTimestamp endTime = request.getRange().getEndTime();
381 return event.getTimestamp().compareTo(endTime) > 0;
382 }
383 return requestCompleted;
384 }
385
386 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
387 return request.isCompleted() || nbRead >= request.getNbRequested();
388 }
389
390 // ------------------------------------------------------------------------
391 // Pass-through's to the request executor
392 // ------------------------------------------------------------------------
393
394 /**
395 * @return the shutdown state (i.e. if it is accepting new requests)
396 * @since 2.0
397 */
398 protected boolean executorIsShutdown() {
399 return fExecutor.isShutdown();
400 }
401
402 /**
403 * @return the termination state
404 * @since 2.0
405 */
406 protected boolean executorIsTerminated() {
407 return fExecutor.isTerminated();
408 }
409
410 // ------------------------------------------------------------------------
411 // Signal handlers
412 // ------------------------------------------------------------------------
413
414 /**
415 * Handler for the start synch signal
416 *
417 * @param signal
418 * Incoming signal
419 */
420 @TmfSignalHandler
421 public void startSynch(TmfStartSynchSignal signal) {
422 synchronized (fLock) {
423 fSignalDepth++;
424 }
425 }
426
427 /**
428 * Handler for the end synch signal
429 *
430 * @param signal
431 * Incoming signal
432 */
433 @TmfSignalHandler
434 public void endSynch(TmfEndSynchSignal signal) {
435 synchronized (fLock) {
436 fSignalDepth--;
437 if (fSignalDepth == 0) {
438 fIsTimeout = false;
439 fireRequest();
440 }
441 }
442 }
443
444 }
This page took 0.044345 seconds and 6 git commands to generate.