Commit | Line | Data |
---|---|---|
79e0a1df | 1 | /******************************************************************************* |
ed902a2b | 2 | * Copyright (c) 2012, 2015 Ericsson |
79e0a1df AM |
3 | * |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Alexandre Montplaisir - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
2bdf0193 | 13 | package org.eclipse.tracecompass.tmf.core.statesystem; |
79e0a1df AM |
14 | |
15 | import java.util.concurrent.ArrayBlockingQueue; | |
16 | import java.util.concurrent.BlockingQueue; | |
17 | ||
d0c7e4ba | 18 | import org.eclipse.jdt.annotation.Nullable; |
e894a508 AM |
19 | import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem; |
20 | import org.eclipse.tracecompass.statesystem.core.ITmfStateSystemBuilder; | |
2bdf0193 AM |
21 | import org.eclipse.tracecompass.tmf.core.event.ITmfEvent; |
22 | import org.eclipse.tracecompass.tmf.core.event.TmfEvent; | |
23 | import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp; | |
0c7471fb | 24 | import org.eclipse.tracecompass.tmf.core.trace.ITmfContext; |
2bdf0193 | 25 | import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace; |
79e0a1df | 26 | |
79e0a1df AM |
27 | /** |
28 | * Instead of using IStateChangeInput directly, one can extend this class, which | |
29 | * defines a lot of the common functions of the state change input plugin. | |
30 | * | |
31 | * It will handle the state-system-processing in a separate thread, which is | |
32 | * normally not a bad idea for traces of some size. | |
33 | * | |
34 | * processEvent() is replaced with eventHandle(), so that all the multi-thread | |
35 | * logic is abstracted away. | |
36 | * | |
37 | * @author Alexandre Montplaisir | |
38 | * @since 2.0 | |
39 | */ | |
0fe46f2a | 40 | public abstract class AbstractTmfStateProvider implements ITmfStateProvider { |
79e0a1df AM |
41 | |
42 | private static final int DEFAULT_EVENTS_QUEUE_SIZE = 10000; | |
43 | ||
79e0a1df | 44 | private final ITmfTrace trace; |
79044a66 AM |
45 | private final Class<? extends ITmfEvent> eventType; |
46 | private final BlockingQueue<ITmfEvent> eventsQueue; | |
79e0a1df AM |
47 | private final Thread eventHandlerThread; |
48 | ||
49 | private boolean ssAssigned; | |
79e0a1df | 50 | |
6f4e8ec0 | 51 | /** State system in which to insert the state changes */ |
d0c7e4ba | 52 | private @Nullable ITmfStateSystemBuilder ss = null; |
6f4e8ec0 | 53 | |
79e0a1df AM |
54 | /** |
55 | * Instantiate a new state provider plugin. | |
56 | * | |
57 | * @param trace | |
58 | * The LTTng 2.0 kernel trace directory | |
79044a66 AM |
59 | * @param eventType |
60 | * The specific class for the event type that will be used within | |
61 | * the subclass | |
71f2da63 AM |
62 | * @param id |
63 | * Name given to this state change input. Only used internally. | |
79e0a1df | 64 | */ |
0fe46f2a | 65 | public AbstractTmfStateProvider(ITmfTrace trace, |
71f2da63 | 66 | Class<? extends ITmfEvent> eventType, String id) { |
79e0a1df | 67 | this.trace = trace; |
79044a66 | 68 | this.eventType = eventType; |
a4524c1b | 69 | eventsQueue = new ArrayBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE); |
79044a66 | 70 | ssAssigned = false; |
71f2da63 | 71 | |
d0c7e4ba AM |
72 | eventHandlerThread = new Thread(new EventProcessor(), id + " Event Handler"); //$NON-NLS-1$ |
73 | } | |
74 | ||
75 | /** | |
76 | * Get the state system builder of this provider (to insert states in). | |
77 | * | |
78 | * @return The state system object to be filled | |
79 | */ | |
80 | protected @Nullable ITmfStateSystemBuilder getStateSystemBuilder() { | |
81 | return ss; | |
79e0a1df AM |
82 | } |
83 | ||
84 | @Override | |
85 | public ITmfTrace getTrace() { | |
86 | return trace; | |
87 | } | |
88 | ||
bcec0116 AM |
89 | /** |
90 | * @since 3.0 | |
91 | */ | |
79e0a1df AM |
92 | @Override |
93 | public long getStartTime() { | |
faa38350 | 94 | return trace.getStartTime().normalize(0, ITmfTimestamp.NANOSECOND_SCALE).getValue(); |
79e0a1df AM |
95 | } |
96 | ||
bcec0116 AM |
97 | /** |
98 | * @since 3.0 | |
99 | */ | |
79e0a1df | 100 | @Override |
f1f86dfb | 101 | public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) { |
79e0a1df AM |
102 | ss = ssb; |
103 | ssAssigned = true; | |
104 | eventHandlerThread.start(); | |
105 | } | |
106 | ||
bcec0116 AM |
107 | /** |
108 | * @since 3.0 | |
109 | */ | |
7e634be6 | 110 | @Override |
d0c7e4ba | 111 | public @Nullable ITmfStateSystem getAssignedStateSystem() { |
7e634be6 AM |
112 | return ss; |
113 | } | |
114 | ||
79e0a1df AM |
115 | @Override |
116 | public void dispose() { | |
117 | /* Insert a null event in the queue to stop the event handler's thread. */ | |
118 | try { | |
aca5f650 | 119 | eventsQueue.put(END_EVENT); |
79e0a1df AM |
120 | eventHandlerThread.join(); |
121 | } catch (InterruptedException e) { | |
122 | e.printStackTrace(); | |
123 | } | |
124 | ssAssigned = false; | |
125 | ss = null; | |
126 | } | |
127 | ||
128 | @Override | |
79044a66 AM |
129 | public final Class<? extends ITmfEvent> getExpectedEventType() { |
130 | return eventType; | |
131 | } | |
132 | ||
133 | @Override | |
134 | public final void processEvent(ITmfEvent event) { | |
79e0a1df AM |
135 | /* Make sure the target state system has been assigned */ |
136 | if (!ssAssigned) { | |
137 | System.err.println("Cannot process event without a target state system"); //$NON-NLS-1$ | |
138 | return; | |
139 | } | |
140 | ||
141 | /* Insert the event we're received into the events queue */ | |
142 | ITmfEvent curEvent = event; | |
143 | try { | |
144 | eventsQueue.put(curEvent); | |
145 | } catch (InterruptedException e) { | |
146 | e.printStackTrace(); | |
147 | } | |
148 | } | |
149 | ||
1b9d3765 AM |
150 | /** |
151 | * Block the caller until the events queue is empty. | |
152 | */ | |
153 | public void waitForEmptyQueue() { | |
154 | /* | |
155 | * We will first insert a dummy event that is guaranteed to not modify | |
156 | * the state. That way, when that event leaves the queue, we will know | |
157 | * for sure that the state system processed the preceding real event. | |
158 | */ | |
1b9d3765 | 159 | try { |
aca5f650 | 160 | eventsQueue.put(EMPTY_QUEUE_EVENT); |
1b9d3765 | 161 | while (!eventsQueue.isEmpty()) { |
6f04e06c | 162 | Thread.sleep(100); |
1b9d3765 AM |
163 | } |
164 | } catch (InterruptedException e) { | |
165 | e.printStackTrace(); | |
166 | } | |
167 | } | |
168 | ||
ef8dd5af | 169 | // ------------------------------------------------------------------------ |
aca5f650 | 170 | // Special event types |
ef8dd5af AM |
171 | // ------------------------------------------------------------------------ |
172 | ||
aca5f650 | 173 | /** Fake event indicating the build is over, and the provider should close */ |
0c7471fb AM |
174 | private static class EndEvent extends TmfEvent { |
175 | public EndEvent() { | |
e1de2fd4 | 176 | super(null, ITmfContext.UNKNOWN_RANK, null, null, null); |
0c7471fb AM |
177 | } |
178 | } | |
179 | ||
aca5f650 | 180 | /** Fake event indicating we want to clear the current queue */ |
0c7471fb AM |
181 | private static class EmptyQueueEvent extends TmfEvent { |
182 | public EmptyQueueEvent() { | |
e1de2fd4 | 183 | super(null, ITmfContext.UNKNOWN_RANK, null, null, null); |
0c7471fb AM |
184 | } |
185 | } | |
aca5f650 MK |
186 | |
187 | private static final EndEvent END_EVENT = new EndEvent(); | |
188 | private static final EmptyQueueEvent EMPTY_QUEUE_EVENT = new EmptyQueueEvent(); | |
189 | ||
190 | // ------------------------------------------------------------------------ | |
191 | // Inner classes | |
192 | // ------------------------------------------------------------------------ | |
ef8dd5af | 193 | |
79e0a1df AM |
194 | /** |
195 | * This is the runner class for the second thread, which will take the | |
196 | * events from the queue and pass them through the state system. | |
197 | */ | |
198 | private class EventProcessor implements Runnable { | |
199 | ||
d0c7e4ba | 200 | private @Nullable ITmfEvent currentEvent; |
e8b7cc14 | 201 | |
79e0a1df AM |
202 | @Override |
203 | public void run() { | |
204 | if (ss == null) { | |
205 | System.err.println("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$ | |
206 | return; | |
207 | } | |
208 | ITmfEvent event; | |
209 | ||
210 | try { | |
211 | event = eventsQueue.take(); | |
aca5f650 MK |
212 | /* This is a singleton, we want to do != instead of !x.equals */ |
213 | while (event != END_EVENT) { | |
214 | if (event == EMPTY_QUEUE_EVENT) { | |
ef8dd5af AM |
215 | /* Synchronization event, should be ignored */ |
216 | event = eventsQueue.take(); | |
217 | continue; | |
218 | } | |
219 | ||
79e0a1df | 220 | currentEvent = event; |
79044a66 AM |
221 | |
222 | /* Make sure this is an event the sub-class can process */ | |
1b9d3765 | 223 | if (eventType.isInstance(event) && event.getType() != null) { |
79044a66 AM |
224 | eventHandle(event); |
225 | } | |
79e0a1df AM |
226 | event = eventsQueue.take(); |
227 | } | |
228 | /* We've received the last event, clean up */ | |
229 | closeStateSystem(); | |
79e0a1df AM |
230 | } catch (InterruptedException e) { |
231 | /* We've been interrupted abnormally */ | |
232 | System.out.println("Event handler interrupted!"); //$NON-NLS-1$ | |
233 | e.printStackTrace(); | |
234 | } | |
235 | } | |
236 | ||
237 | private void closeStateSystem() { | |
d0c7e4ba AM |
238 | ITmfEvent event = currentEvent; |
239 | final long endTime = (event == null) ? 0 : | |
240 | event.getTimestamp().normalize(0, ITmfTimestamp.NANOSECOND_SCALE).getValue(); | |
241 | ||
242 | if (ss != null) { | |
243 | ss.closeHistory(endTime); | |
244 | } | |
79e0a1df AM |
245 | } |
246 | } | |
247 | ||
248 | // ------------------------------------------------------------------------ | |
249 | // Abstract methods | |
250 | // ------------------------------------------------------------------------ | |
251 | ||
79e0a1df AM |
252 | /** |
253 | * Handle the given event and send the appropriate state transitions into | |
254 | * the the state system. | |
255 | * | |
256 | * This is basically the same thing as IStateChangeInput.processEvent(), | |
257 | * except here processEvent() and eventHandle() are run in two different | |
258 | * threads (and the AbstractStateChangeInput takes care of processEvent() | |
259 | * already). | |
260 | * | |
261 | * @param event | |
262 | * The event to process. If you need a specific event type, you | |
263 | * should check for its instance right at the beginning. | |
264 | */ | |
265 | protected abstract void eventHandle(ITmfEvent event); | |
266 | } |