lttng: add names of traces in sync algorithm stats
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / internal / tmf / core / component / TmfEventThread.java
1 /*******************************************************************************
2 * Copyright (c) 2012, 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 * Bernd Hufmann - Update handling of suspend and resume
12 * Patrick Tasse - Exit early if request is cancelled
13 *******************************************************************************/
14
15 package org.eclipse.tracecompass.internal.tmf.core.component;
16
17 import java.util.concurrent.CountDownLatch;
18
19 import org.eclipse.tracecompass.internal.tmf.core.Activator;
20 import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
21 import org.eclipse.tracecompass.tmf.core.component.ITmfEventProvider;
22 import org.eclipse.tracecompass.tmf.core.component.TmfEventProvider;
23 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
24 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
25 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
26 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
27
28 /**
29 * Provides the core event request processor. It also has support for suspending
30 * and resuming a request in a thread-safe manner.
31 *
32 * @author Francois Chouinard
33 * @version 1.0
34 */
35 public class TmfEventThread implements Runnable {
36
37 // ------------------------------------------------------------------------
38 // Attributes
39 // ------------------------------------------------------------------------
40
41 /**
42 * The event provider
43 */
44 private final TmfEventProvider fProvider;
45
46 /**
47 * The wrapped event request
48 */
49 private final ITmfEventRequest fRequest;
50
51 /**
52 * The request execution priority
53 */
54 private final ExecutionType fExecType;
55
56 /**
57 * The wrapped thread (if applicable)
58 */
59 private final TmfEventThread fThread;
60
61 private volatile CountDownLatch fLatch = new CountDownLatch(1);
62
63 /**
64 * The thread execution state
65 */
66 private volatile boolean isCompleted = false;
67
68 /** The flag for suspending a thread */
69 private volatile boolean fIsPaused = false;
70
71 // ------------------------------------------------------------------------
72 // Constructor
73 // ------------------------------------------------------------------------
74
75 /**
76 * Basic constructor
77 *
78 * @param provider the event provider
79 * @param request the request to process
80 */
81 public TmfEventThread(TmfEventProvider provider, ITmfEventRequest request) {
82 assert provider != null;
83 assert request != null;
84 fProvider = provider;
85 fRequest = request;
86 fExecType = request.getExecType();
87 fThread = null;
88 }
89
90 /**
91 * Wrapper constructor
92 *
93 * @param thread the thread to wrap
94 */
95 public TmfEventThread(TmfEventThread thread) {
96 fProvider = thread.fProvider;
97 fRequest = thread.fRequest;
98 fExecType = thread.fExecType;
99 fThread = thread;
100 }
101
102 // ------------------------------------------------------------------------
103 // Getters
104 // ------------------------------------------------------------------------
105
106 /**
107 * @return The wrapped thread
108 */
109 public TmfEventThread getThread() {
110 return fThread;
111 }
112
113 /**
114 * @return The event provider
115 */
116 public ITmfEventProvider getProvider() {
117 return fProvider;
118 }
119
120 /**
121 * @return The event request
122 */
123 public ITmfEventRequest getRequest() {
124 return fRequest;
125 }
126
127 /**
128 * @return The request execution priority
129 */
130 public ExecutionType getExecType() {
131 return fExecType;
132 }
133
134 /**
135 * @return The request execution state
136 */
137 public boolean isRunning() {
138 return fRequest.isRunning() && !isPaused();
139 }
140
141 /**
142 * @return The request execution state
143 */
144 public boolean isPaused() {
145 return fIsPaused;
146 }
147
148 /**
149 * @return The request execution state
150 */
151 public boolean isCompleted() {
152 return isCompleted;
153 }
154
155 // ------------------------------------------------------------------------
156 // Runnable
157 // ------------------------------------------------------------------------
158
159 @Override
160 public void run() {
161
162 TmfCoreTracer.traceRequest(fRequest.getRequestId(), "is being serviced by " + fProvider.getName()); //$NON-NLS-1$
163
164 if (fRequest.isCancelled()) {
165 isCompleted = true;
166 return;
167 }
168
169 // Extract the generic information
170 fRequest.start();
171 int nbRequested = fRequest.getNbRequested();
172 int nbRead = 0;
173
174 // Initialize the execution
175 ITmfContext context = fProvider.armRequest(fRequest);
176 if (context == null) {
177 isCompleted = true;
178 fRequest.cancel();
179 return;
180 }
181
182 try {
183 // Get the ordered events
184 ITmfEvent event = fProvider.getNext(context);
185 TmfCoreTracer.traceRequest(fRequest.getRequestId(), "read first event"); //$NON-NLS-1$
186
187 while (event != null && !fProvider.isCompleted(fRequest, event, nbRead)) {
188
189 TmfCoreTracer.traceEvent(fProvider, fRequest, event);
190 if (fRequest.getDataType().isInstance(event)) {
191 fRequest.handleData(event);
192 }
193
194 // Pause execution if requested
195 while (fIsPaused) {
196 CountDownLatch latch = fLatch;
197 latch.await();
198 }
199
200 // To avoid an unnecessary read passed the last event requested
201 if (++nbRead < nbRequested) {
202 event = fProvider.getNext(context);
203 }
204 }
205
206 isCompleted = true;
207
208 if (fRequest.isCancelled()) {
209 fRequest.cancel();
210 } else {
211 fRequest.done();
212 }
213
214 } catch (Exception e) {
215 Activator.logError("Error in " + fProvider.getName() + " handling " + fRequest, e); //$NON-NLS-1$ //$NON-NLS-2$
216 isCompleted = true;
217 fRequest.fail(e);
218 }
219
220 // Cleanup
221 context.dispose();
222 }
223
224 // ------------------------------------------------------------------------
225 // Operations
226 // ------------------------------------------------------------------------
227
228 /**
229 * Suspend the thread
230 */
231 public void suspend() {
232 fIsPaused = true;
233 TmfCoreTracer.traceRequest(fRequest.getRequestId(), "SUSPENDED"); //$NON-NLS-1$
234 }
235
236 /**
237 * Resume the thread
238 */
239 public void resume() {
240 fIsPaused = false;
241
242 CountDownLatch oldLatch = fLatch;
243 fLatch = new CountDownLatch(1);
244 oldLatch.countDown();
245
246 TmfCoreTracer.traceRequest(fRequest.getRequestId(), "RESUMED"); //$NON-NLS-1$
247 }
248
249 /**
250 * Cancel the request
251 */
252 public void cancel() {
253 if (!fRequest.isCompleted()) {
254 fRequest.cancel();
255 }
256 }
257 }
This page took 0.048887 seconds and 5 git commands to generate.