Commit | Line | Data |
---|---|---|
79e0a1df | 1 | /******************************************************************************* |
61759503 | 2 | * Copyright (c) 2012, 2013 Ericsson |
79e0a1df AM |
3 | * |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Alexandre Montplaisir - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.tmf.core.statesystem; | |
14 | ||
15 | import java.util.concurrent.ArrayBlockingQueue; | |
16 | import java.util.concurrent.BlockingQueue; | |
17 | ||
6cfa0200 | 18 | import org.eclipse.linuxtools.tmf.core.ctfadaptor.CtfTmfEventFactory; |
79e0a1df | 19 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; |
1b9d3765 | 20 | import org.eclipse.linuxtools.tmf.core.event.TmfEvent; |
79e0a1df | 21 | import org.eclipse.linuxtools.tmf.core.exceptions.TimeRangeException; |
3bd46eef | 22 | import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp; |
1b9d3765 | 23 | import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimestamp; |
79e0a1df AM |
24 | import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace; |
25 | ||
26 | ||
27 | /** | |
28 | * Instead of using IStateChangeInput directly, one can extend this class, which | |
29 | * defines a lot of the common functions of the state change input plugin. | |
30 | * | |
31 | * It will handle the state-system-processing in a separate thread, which is | |
32 | * normally not a bad idea for traces of some size. | |
33 | * | |
34 | * processEvent() is replaced with eventHandle(), so that all the multi-thread | |
35 | * logic is abstracted away. | |
36 | * | |
37 | * @author Alexandre Montplaisir | |
38 | * @since 2.0 | |
39 | */ | |
0fe46f2a | 40 | public abstract class AbstractTmfStateProvider implements ITmfStateProvider { |
79e0a1df AM |
41 | |
42 | private static final int DEFAULT_EVENTS_QUEUE_SIZE = 10000; | |
43 | ||
79e0a1df | 44 | private final ITmfTrace trace; |
79044a66 AM |
45 | private final Class<? extends ITmfEvent> eventType; |
46 | private final BlockingQueue<ITmfEvent> eventsQueue; | |
79e0a1df AM |
47 | private final Thread eventHandlerThread; |
48 | ||
49 | private boolean ssAssigned; | |
79e0a1df AM |
50 | private ITmfEvent currentEvent; |
51 | ||
6f4e8ec0 | 52 | /** State system in which to insert the state changes */ |
339d27b4 | 53 | protected ITmfStateSystemBuilder ss = null; |
6f4e8ec0 | 54 | |
79e0a1df AM |
55 | /** |
56 | * Instantiate a new state provider plugin. | |
57 | * | |
58 | * @param trace | |
59 | * The LTTng 2.0 kernel trace directory | |
79044a66 AM |
60 | * @param eventType |
61 | * The specific class for the event type that will be used within | |
62 | * the subclass | |
71f2da63 AM |
63 | * @param id |
64 | * Name given to this state change input. Only used internally. | |
79e0a1df | 65 | */ |
0fe46f2a | 66 | public AbstractTmfStateProvider(ITmfTrace trace, |
71f2da63 | 67 | Class<? extends ITmfEvent> eventType, String id) { |
79e0a1df | 68 | this.trace = trace; |
79044a66 AM |
69 | this.eventType = eventType; |
70 | eventsQueue = new ArrayBlockingQueue<ITmfEvent>(DEFAULT_EVENTS_QUEUE_SIZE); | |
79044a66 | 71 | ssAssigned = false; |
71f2da63 AM |
72 | |
73 | String id2 = (id == null ? "Unamed" : id); //$NON-NLS-1$ | |
74 | eventHandlerThread = new Thread(new EventProcessor(), id2 + " Event Handler"); //$NON-NLS-1$ | |
75 | ||
79e0a1df AM |
76 | } |
77 | ||
78 | @Override | |
79 | public ITmfTrace getTrace() { | |
80 | return trace; | |
81 | } | |
82 | ||
83 | @Override | |
84 | public long getStartTime() { | |
faa38350 | 85 | return trace.getStartTime().normalize(0, ITmfTimestamp.NANOSECOND_SCALE).getValue(); |
79e0a1df AM |
86 | } |
87 | ||
88 | @Override | |
f1f86dfb | 89 | public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) { |
79e0a1df AM |
90 | ss = ssb; |
91 | ssAssigned = true; | |
92 | eventHandlerThread.start(); | |
93 | } | |
94 | ||
7e634be6 AM |
95 | @Override |
96 | public ITmfStateSystem getAssignedStateSystem() { | |
97 | return ss; | |
98 | } | |
99 | ||
79e0a1df AM |
100 | @Override |
101 | public void dispose() { | |
102 | /* Insert a null event in the queue to stop the event handler's thread. */ | |
103 | try { | |
6cfa0200 | 104 | eventsQueue.put(CtfTmfEventFactory.getNullEvent()); |
79e0a1df AM |
105 | eventHandlerThread.join(); |
106 | } catch (InterruptedException e) { | |
107 | e.printStackTrace(); | |
108 | } | |
109 | ssAssigned = false; | |
110 | ss = null; | |
111 | } | |
112 | ||
113 | @Override | |
79044a66 AM |
114 | public final Class<? extends ITmfEvent> getExpectedEventType() { |
115 | return eventType; | |
116 | } | |
117 | ||
118 | @Override | |
119 | public final void processEvent(ITmfEvent event) { | |
79e0a1df AM |
120 | /* Make sure the target state system has been assigned */ |
121 | if (!ssAssigned) { | |
122 | System.err.println("Cannot process event without a target state system"); //$NON-NLS-1$ | |
123 | return; | |
124 | } | |
125 | ||
126 | /* Insert the event we're received into the events queue */ | |
127 | ITmfEvent curEvent = event; | |
128 | try { | |
129 | eventsQueue.put(curEvent); | |
130 | } catch (InterruptedException e) { | |
131 | e.printStackTrace(); | |
132 | } | |
133 | } | |
134 | ||
1b9d3765 AM |
135 | /** |
136 | * Block the caller until the events queue is empty. | |
137 | */ | |
138 | public void waitForEmptyQueue() { | |
139 | /* | |
140 | * We will first insert a dummy event that is guaranteed to not modify | |
141 | * the state. That way, when that event leaves the queue, we will know | |
142 | * for sure that the state system processed the preceding real event. | |
143 | */ | |
ef8dd5af | 144 | TmfEvent ev = new EmptyEvent(); |
1b9d3765 AM |
145 | |
146 | try { | |
147 | eventsQueue.put(ev); | |
148 | while (!eventsQueue.isEmpty()) { | |
149 | Thread.sleep(100); | |
150 | } | |
151 | } catch (InterruptedException e) { | |
152 | e.printStackTrace(); | |
153 | } | |
154 | } | |
155 | ||
ef8dd5af AM |
156 | // ------------------------------------------------------------------------ |
157 | // Inner classes | |
158 | // ------------------------------------------------------------------------ | |
159 | ||
160 | /** | |
161 | * Empty event that should be totally ignored by the event handler. It can | |
162 | * by used for synchronisation purposes. | |
163 | */ | |
164 | private class EmptyEvent extends TmfEvent { | |
165 | public EmptyEvent() { | |
166 | super(null, new TmfTimestamp(0), null, null, null, null); | |
167 | } | |
168 | } | |
169 | ||
79e0a1df AM |
170 | /** |
171 | * This is the runner class for the second thread, which will take the | |
172 | * events from the queue and pass them through the state system. | |
173 | */ | |
174 | private class EventProcessor implements Runnable { | |
175 | ||
176 | @Override | |
177 | public void run() { | |
178 | if (ss == null) { | |
179 | System.err.println("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$ | |
180 | return; | |
181 | } | |
182 | ITmfEvent event; | |
183 | ||
184 | try { | |
185 | event = eventsQueue.take(); | |
186 | while (event.getTimestamp().getValue() != -1) { | |
ef8dd5af AM |
187 | if (event instanceof EmptyEvent) { |
188 | /* Synchronization event, should be ignored */ | |
189 | event = eventsQueue.take(); | |
190 | continue; | |
191 | } | |
192 | ||
79e0a1df | 193 | currentEvent = event; |
79044a66 AM |
194 | |
195 | /* Make sure this is an event the sub-class can process */ | |
1b9d3765 | 196 | if (eventType.isInstance(event) && event.getType() != null) { |
79044a66 AM |
197 | eventHandle(event); |
198 | } | |
79e0a1df AM |
199 | event = eventsQueue.take(); |
200 | } | |
201 | /* We've received the last event, clean up */ | |
202 | closeStateSystem(); | |
203 | return; | |
204 | } catch (InterruptedException e) { | |
205 | /* We've been interrupted abnormally */ | |
206 | System.out.println("Event handler interrupted!"); //$NON-NLS-1$ | |
207 | e.printStackTrace(); | |
208 | } | |
209 | } | |
210 | ||
211 | private void closeStateSystem() { | |
212 | /* Close the History system, if there is one */ | |
213 | if (currentEvent == null) { | |
214 | return; | |
215 | } | |
216 | try { | |
faa38350 | 217 | ss.closeHistory(currentEvent.getTimestamp().normalize(0, ITmfTimestamp.NANOSECOND_SCALE).getValue()); |
79e0a1df AM |
218 | } catch (TimeRangeException e) { |
219 | /* | |
220 | * Since we're using currentEvent.getTimestamp, this shouldn't | |
221 | * cause any problem | |
222 | */ | |
223 | e.printStackTrace(); | |
224 | } | |
225 | } | |
226 | } | |
227 | ||
228 | // ------------------------------------------------------------------------ | |
229 | // Abstract methods | |
230 | // ------------------------------------------------------------------------ | |
231 | ||
79e0a1df AM |
232 | /** |
233 | * Handle the given event and send the appropriate state transitions into | |
234 | * the the state system. | |
235 | * | |
236 | * This is basically the same thing as IStateChangeInput.processEvent(), | |
237 | * except here processEvent() and eventHandle() are run in two different | |
238 | * threads (and the AbstractStateChangeInput takes care of processEvent() | |
239 | * already). | |
240 | * | |
241 | * @param event | |
242 | * The event to process. If you need a specific event type, you | |
243 | * should check for its instance right at the beginning. | |
244 | */ | |
245 | protected abstract void eventHandle(ITmfEvent event); | |
246 | } |