Commit | Line | Data |
---|---|---|
79e0a1df | 1 | /******************************************************************************* |
ed902a2b | 2 | * Copyright (c) 2012, 2015 Ericsson |
79e0a1df AM |
3 | * |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Alexandre Montplaisir - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
2bdf0193 | 13 | package org.eclipse.tracecompass.tmf.core.statesystem; |
79e0a1df | 14 | |
e2bcc8a5 | 15 | import org.eclipse.jdt.annotation.NonNull; |
d0c7e4ba | 16 | import org.eclipse.jdt.annotation.Nullable; |
423cf6a4 | 17 | import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; |
1ff5d16c | 18 | import org.eclipse.tracecompass.internal.tmf.core.Activator; |
e894a508 AM |
19 | import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem; |
20 | import org.eclipse.tracecompass.statesystem.core.ITmfStateSystemBuilder; | |
2bdf0193 AM |
21 | import org.eclipse.tracecompass.tmf.core.event.ITmfEvent; |
22 | import org.eclipse.tracecompass.tmf.core.event.TmfEvent; | |
0c7471fb | 23 | import org.eclipse.tracecompass.tmf.core.trace.ITmfContext; |
2bdf0193 | 24 | import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace; |
79e0a1df | 25 | |
79e0a1df AM |
26 | /** |
27 | * Instead of using IStateChangeInput directly, one can extend this class, which | |
28 | * defines a lot of the common functions of the state change input plugin. | |
29 | * | |
30 | * It will handle the state-system-processing in a separate thread, which is | |
31 | * normally not a bad idea for traces of some size. | |
32 | * | |
33 | * processEvent() is replaced with eventHandle(), so that all the multi-thread | |
34 | * logic is abstracted away. | |
35 | * | |
36 | * @author Alexandre Montplaisir | |
79e0a1df | 37 | */ |
0fe46f2a | 38 | public abstract class AbstractTmfStateProvider implements ITmfStateProvider { |
79e0a1df | 39 | |
423cf6a4 MK |
40 | private static final int DEFAULT_EVENTS_QUEUE_SIZE = 127; |
41 | private static final int DEFAULT_EVENTS_CHUNK_SIZE = 127; | |
79e0a1df | 42 | |
086cd39c | 43 | private final ITmfTrace fTrace; |
423cf6a4 | 44 | private final BufferedBlockingQueue<ITmfEvent> fEventsQueue; |
086cd39c | 45 | private final Thread fEventHandlerThread; |
79e0a1df | 46 | |
086cd39c | 47 | private boolean fStateSystemAssigned; |
6f4e8ec0 | 48 | /** State system in which to insert the state changes */ |
086cd39c | 49 | private @Nullable ITmfStateSystemBuilder fSS = null; |
6f4e8ec0 | 50 | |
91dc8d51 GB |
51 | /* The last safe time at which this state provider can be queried */ |
52 | private volatile long fSafeTime; | |
53 | ||
79e0a1df AM |
54 | /** |
55 | * Instantiate a new state provider plugin. | |
56 | * | |
57 | * @param trace | |
58 | * The LTTng 2.0 kernel trace directory | |
71f2da63 AM |
59 | * @param id |
60 | * Name given to this state change input. Only used internally. | |
79e0a1df | 61 | */ |
e2bcc8a5 | 62 | public AbstractTmfStateProvider(ITmfTrace trace, String id) { |
086cd39c | 63 | fTrace = trace; |
423cf6a4 | 64 | fEventsQueue = new BufferedBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE, DEFAULT_EVENTS_CHUNK_SIZE); |
086cd39c | 65 | fStateSystemAssigned = false; |
91dc8d51 GB |
66 | // set the safe time to before the trace start, the analysis has not yet started |
67 | fSafeTime = trace.getStartTime().toNanos() - 1; | |
71f2da63 | 68 | |
086cd39c | 69 | fEventHandlerThread = new Thread(new EventProcessor(), id + " Event Handler"); //$NON-NLS-1$ |
d0c7e4ba AM |
70 | } |
71 | ||
72 | /** | |
73 | * Get the state system builder of this provider (to insert states in). | |
74 | * | |
75 | * @return The state system object to be filled | |
76 | */ | |
77 | protected @Nullable ITmfStateSystemBuilder getStateSystemBuilder() { | |
086cd39c | 78 | return fSS; |
79e0a1df AM |
79 | } |
80 | ||
81 | @Override | |
82 | public ITmfTrace getTrace() { | |
086cd39c | 83 | return fTrace; |
79e0a1df AM |
84 | } |
85 | ||
86 | @Override | |
87 | public long getStartTime() { | |
16801c72 | 88 | return fTrace.getStartTime().toNanos(); |
79e0a1df AM |
89 | } |
90 | ||
91dc8d51 GB |
91 | /** |
92 | * @since 2.0 | |
93 | */ | |
94 | @Override | |
95 | public long getLatestSafeTime() { | |
96 | return fSafeTime; | |
97 | } | |
98 | ||
79e0a1df | 99 | @Override |
f1f86dfb | 100 | public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) { |
086cd39c MK |
101 | fSS = ssb; |
102 | fStateSystemAssigned = true; | |
103 | fEventHandlerThread.start(); | |
79e0a1df AM |
104 | } |
105 | ||
7e634be6 | 106 | @Override |
d0c7e4ba | 107 | public @Nullable ITmfStateSystem getAssignedStateSystem() { |
086cd39c | 108 | return fSS; |
7e634be6 AM |
109 | } |
110 | ||
79e0a1df AM |
111 | @Override |
112 | public void dispose() { | |
113 | /* Insert a null event in the queue to stop the event handler's thread. */ | |
114 | try { | |
086cd39c | 115 | fEventsQueue.put(END_EVENT); |
423cf6a4 | 116 | fEventsQueue.flushInputBuffer(); |
086cd39c | 117 | fEventHandlerThread.join(); |
79e0a1df AM |
118 | } catch (InterruptedException e) { |
119 | e.printStackTrace(); | |
120 | } | |
086cd39c MK |
121 | fStateSystemAssigned = false; |
122 | fSS = null; | |
79e0a1df AM |
123 | } |
124 | ||
79044a66 AM |
125 | @Override |
126 | public final void processEvent(ITmfEvent event) { | |
79e0a1df | 127 | /* Make sure the target state system has been assigned */ |
086cd39c | 128 | if (!fStateSystemAssigned) { |
1ff5d16c | 129 | Activator.logError("Cannot process event without a target state system"); //$NON-NLS-1$ |
79e0a1df AM |
130 | return; |
131 | } | |
132 | ||
133 | /* Insert the event we're received into the events queue */ | |
134 | ITmfEvent curEvent = event; | |
423cf6a4 | 135 | fEventsQueue.put(curEvent); |
79e0a1df AM |
136 | } |
137 | ||
1b9d3765 AM |
138 | /** |
139 | * Block the caller until the events queue is empty. | |
140 | */ | |
141 | public void waitForEmptyQueue() { | |
142 | /* | |
143 | * We will first insert a dummy event that is guaranteed to not modify | |
144 | * the state. That way, when that event leaves the queue, we will know | |
145 | * for sure that the state system processed the preceding real event. | |
146 | */ | |
1b9d3765 | 147 | try { |
086cd39c | 148 | fEventsQueue.put(EMPTY_QUEUE_EVENT); |
423cf6a4 | 149 | fEventsQueue.flushInputBuffer(); |
086cd39c | 150 | while (!fEventsQueue.isEmpty()) { |
6f04e06c | 151 | Thread.sleep(100); |
1b9d3765 AM |
152 | } |
153 | } catch (InterruptedException e) { | |
154 | e.printStackTrace(); | |
155 | } | |
156 | } | |
157 | ||
ef8dd5af | 158 | // ------------------------------------------------------------------------ |
aca5f650 | 159 | // Special event types |
ef8dd5af AM |
160 | // ------------------------------------------------------------------------ |
161 | ||
aca5f650 | 162 | /** Fake event indicating the build is over, and the provider should close */ |
0c7471fb AM |
163 | private static class EndEvent extends TmfEvent { |
164 | public EndEvent() { | |
e1de2fd4 | 165 | super(null, ITmfContext.UNKNOWN_RANK, null, null, null); |
0c7471fb AM |
166 | } |
167 | } | |
168 | ||
aca5f650 | 169 | /** Fake event indicating we want to clear the current queue */ |
0c7471fb AM |
170 | private static class EmptyQueueEvent extends TmfEvent { |
171 | public EmptyQueueEvent() { | |
e1de2fd4 | 172 | super(null, ITmfContext.UNKNOWN_RANK, null, null, null); |
0c7471fb AM |
173 | } |
174 | } | |
aca5f650 MK |
175 | |
176 | private static final EndEvent END_EVENT = new EndEvent(); | |
177 | private static final EmptyQueueEvent EMPTY_QUEUE_EVENT = new EmptyQueueEvent(); | |
178 | ||
179 | // ------------------------------------------------------------------------ | |
180 | // Inner classes | |
181 | // ------------------------------------------------------------------------ | |
ef8dd5af | 182 | |
79e0a1df AM |
183 | /** |
184 | * This is the runner class for the second thread, which will take the | |
185 | * events from the queue and pass them through the state system. | |
186 | */ | |
187 | private class EventProcessor implements Runnable { | |
188 | ||
d0c7e4ba | 189 | private @Nullable ITmfEvent currentEvent; |
e8b7cc14 | 190 | |
79e0a1df AM |
191 | @Override |
192 | public void run() { | |
086cd39c | 193 | if (!fStateSystemAssigned) { |
1ff5d16c | 194 | Activator.logError("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$ |
79e0a1df AM |
195 | return; |
196 | } | |
79e0a1df | 197 | |
423cf6a4 MK |
198 | |
199 | /* | |
200 | * We never insert null in the queue. Cannot be checked at | |
201 | * compile-time until Java 8 annotations... | |
202 | */ | |
0e4f957e | 203 | @NonNull ITmfEvent event = fEventsQueue.take(); |
423cf6a4 MK |
204 | /* This is a singleton, we want to do != instead of !x.equals */ |
205 | while (event != END_EVENT) { | |
206 | if (event == EMPTY_QUEUE_EVENT) { | |
207 | /* Synchronization event, should be ignored */ | |
0e4f957e | 208 | event = fEventsQueue.take(); |
423cf6a4 | 209 | continue; |
79e0a1df | 210 | } |
423cf6a4 | 211 | currentEvent = event; |
91dc8d51 | 212 | fSafeTime = event.getTimestamp().toNanos() - 1; |
423cf6a4 | 213 | eventHandle(event); |
0e4f957e | 214 | event = fEventsQueue.take(); |
79e0a1df | 215 | } |
423cf6a4 MK |
216 | /* We've received the last event, clean up */ |
217 | closeStateSystem(); | |
79e0a1df AM |
218 | } |
219 | ||
220 | private void closeStateSystem() { | |
d0c7e4ba AM |
221 | ITmfEvent event = currentEvent; |
222 | final long endTime = (event == null) ? 0 : | |
16801c72 | 223 | event.getTimestamp().toNanos(); |
d0c7e4ba | 224 | |
086cd39c MK |
225 | if (fSS != null) { |
226 | fSS.closeHistory(endTime); | |
d0c7e4ba | 227 | } |
79e0a1df AM |
228 | } |
229 | } | |
230 | ||
231 | // ------------------------------------------------------------------------ | |
232 | // Abstract methods | |
233 | // ------------------------------------------------------------------------ | |
234 | ||
79e0a1df AM |
235 | /** |
236 | * Handle the given event and send the appropriate state transitions into | |
237 | * the the state system. | |
238 | * | |
239 | * This is basically the same thing as IStateChangeInput.processEvent(), | |
240 | * except here processEvent() and eventHandle() are run in two different | |
241 | * threads (and the AbstractStateChangeInput takes care of processEvent() | |
242 | * already). | |
243 | * | |
244 | * @param event | |
245 | * The event to process. If you need a specific event type, you | |
246 | * should check for its instance right at the beginning. | |
247 | */ | |
248 | protected abstract void eventHandle(ITmfEvent event); | |
086cd39c | 249 | |
79e0a1df | 250 | } |