1 /*******************************************************************************
2 * Copyright (c) 2012, 2015 Ericsson
3 * Copyright (c) 2010, 2011 École Polytechnique de Montréal
4 * Copyright (c) 2010, 2011 Alexandre Montplaisir <alexandre.montplaisir@gmail.com>
6 * All rights reserved. This program and the accompanying materials are
7 * made available under the terms of the Eclipse Public License v1.0 which
8 * accompanies this distribution, and is available at
9 * http://www.eclipse.org/legal/epl-v10.html
12 * Alexandre Montplaisir - Initial API and implementation
13 *******************************************************************************/
15 package org
.eclipse
.tracecompass
.internal
.statesystem
.core
.backend
.historytree
;
18 import java
.io
.IOException
;
19 import java
.util
.List
;
21 import org
.eclipse
.jdt
.annotation
.NonNull
;
22 import org
.eclipse
.tracecompass
.common
.core
.collect
.BufferedBlockingQueue
;
23 import org
.eclipse
.tracecompass
.internal
.statesystem
.core
.Activator
;
24 import org
.eclipse
.tracecompass
.statesystem
.core
.exceptions
.StateSystemDisposedException
;
25 import org
.eclipse
.tracecompass
.statesystem
.core
.exceptions
.TimeRangeException
;
26 import org
.eclipse
.tracecompass
.statesystem
.core
.interval
.ITmfStateInterval
;
27 import org
.eclipse
.tracecompass
.statesystem
.core
.statevalue
.ITmfStateValue
;
28 import org
.eclipse
.tracecompass
.statesystem
.core
.statevalue
.TmfStateValue
;
31 * Variant of the HistoryTreeBackend which runs all the interval-insertion logic
32 * in a separate thread.
34 * @author Alexandre Montplaisir
36 public final class ThreadedHistoryTreeBackend
extends HistoryTreeBackend
39 private static final int CHUNK_SIZE
= 127;
40 private final @NonNull BufferedBlockingQueue
<HTInterval
> intervalQueue
;
41 private final @NonNull Thread shtThread
;
44 * New state history constructor
46 * Note that it usually doesn't make sense to use a Threaded HT if you're
47 * opening an existing state-file, but you know what you're doing...
50 * The state system's id
52 * The name of the history file that will be created. Should end
54 * @param providerVersion
55 * Version of of the state provider. We will only try to reopen
56 * existing files if this version matches the one in the
59 * The earliest timestamp stored in the history
61 * The size of the interval insertion queue. 2000 - 10000 usually
64 * The size of the blocks in the file
66 * The maximum number of children allowed for each core node
68 * If there was a problem opening the history file for writing
70 public ThreadedHistoryTreeBackend(@NonNull String ssid
,
78 super(ssid
, newStateFile
, providerVersion
, startTime
, blockSize
, maxChildren
);
80 intervalQueue
= new BufferedBlockingQueue
<>(queueSize
/ CHUNK_SIZE
, CHUNK_SIZE
);
81 shtThread
= new Thread(this, "History Tree Thread"); //$NON-NLS-1$
86 * New State History constructor. This version provides default values for
87 * blockSize and maxChildren.
90 * The state system's id
92 * The name of the history file that will be created. Should end
94 * @param providerVersion
95 * Version of of the state provider. We will only try to reopen
96 * existing files if this version matches the one in the
99 * The earliest timestamp stored in the history
101 * The size of the interval insertion queue. 2000 - 10000 usually
103 * @throws IOException
104 * If there was a problem opening the history file for writing
106 public ThreadedHistoryTreeBackend(@NonNull String ssid
,
112 super(ssid
, newStateFile
, providerVersion
, startTime
);
114 intervalQueue
= new BufferedBlockingQueue
<>(queueSize
/ CHUNK_SIZE
, CHUNK_SIZE
);
115 shtThread
= new Thread(this, "History Tree Thread"); //$NON-NLS-1$
120 * The Threaded version does not specify an "existing file" constructor,
121 * since the history is already built (and we only use the other thread
122 * during building). Just use a plain HistoryTreeProvider in this case.
124 * TODO but what about streaming??
128 public void insertPastState(long stateStartTime
, long stateEndTime
,
129 int quark
, ITmfStateValue value
) throws TimeRangeException
{
131 * Here, instead of directly inserting the elements in the History Tree
132 * underneath, we'll put them in the Queue. They will then be taken and
133 * processed by the other thread executing the run() method.
135 HTInterval interval
= new HTInterval(stateStartTime
, stateEndTime
,
136 quark
, (TmfStateValue
) value
);
137 intervalQueue
.put(interval
);
141 public void finishedBuilding(long endTime
) {
143 * We need to commit everything in the History Tree and stop the
144 * standalone thread before returning to the StateHistorySystem. (SHS
145 * will then write the Attribute Tree to the file, that must not happen
146 * at the same time we are writing the last nodes!)
149 stopRunningThread(endTime
);
150 setFinishedBuilding(true);
155 public void dispose() {
156 if (!isFinishedBuilding()) {
157 stopRunningThread(Long
.MAX_VALUE
);
160 * isFinishedBuilding remains false, so the superclass will ask the
161 * back-end to delete the file.
166 private void stopRunningThread(long endTime
) {
167 if (!shtThread
.isAlive()) {
172 * Send a "poison pill" in the queue, then wait for the HT to finish its
176 HTInterval pill
= new HTInterval(-1, endTime
, -1, TmfStateValue
.nullValue());
177 intervalQueue
.put(pill
);
178 intervalQueue
.flushInputBuffer();
180 } catch (TimeRangeException e
) {
181 Activator
.getDefault().logError("Error closing state system", e
); //$NON-NLS-1$
182 } catch (InterruptedException e
) {
183 Activator
.getDefault().logError("State system interrupted", e
); //$NON-NLS-1$
189 HTInterval currentInterval
;
191 currentInterval
= intervalQueue
.take();
192 while (currentInterval
.getStartTime() != -1) {
193 /* Send the interval to the History Tree */
194 getSHT().insertInterval(currentInterval
);
195 currentInterval
= intervalQueue
.take();
197 if (currentInterval
.getAttribute() != -1) {
198 /* Make sure this is the "poison pill" we are waiting for */
199 throw new IllegalStateException();
202 * We've been told we're done, let's write down everything and quit.
203 * The end time of this "signal interval" is actually correct.
205 getSHT().closeTree(currentInterval
.getEndTime());
207 } catch (TimeRangeException e
) {
208 /* This should not happen */
209 Activator
.getDefault().logError("Error starting the state system", e
); //$NON-NLS-1$
213 // ------------------------------------------------------------------------
215 // ------------------------------------------------------------------------
218 public void doQuery(List
<ITmfStateInterval
> currentStateInfo
, long t
)
219 throws TimeRangeException
, StateSystemDisposedException
{
220 super.doQuery(currentStateInfo
, t
);
222 if (isFinishedBuilding()) {
224 * The history tree is the only place to look for intervals once
225 * construction is finished.
231 * It is possible we may have missed some intervals due to them being in
232 * the queue while the query was ongoing. Go over the results to see if
235 for (int i
= 0; i
< currentStateInfo
.size(); i
++) {
236 if (currentStateInfo
.get(i
) == null) {
237 /* Query the missing interval via "unicast" */
238 ITmfStateInterval interval
= doSingularQuery(t
, i
);
239 currentStateInfo
.set(i
, interval
);
245 public ITmfStateInterval
doSingularQuery(long t
, int attributeQuark
)
246 throws TimeRangeException
, StateSystemDisposedException
{
247 ITmfStateInterval ret
= super.doSingularQuery(t
, attributeQuark
);
253 * We couldn't find the interval in the history tree. It's possible that
254 * it is currently in the intervalQueue. Look for it there. Note that
255 * BufferedBlockingQueue's iterator() is thread-safe (no need to lock
258 for (ITmfStateInterval interval
: intervalQueue
) {
259 if (interval
.getAttribute() == attributeQuark
&& interval
.intersects(t
)) {
265 * If we missed it again, it's because it got inserted in the tree
266 * *while we were iterating* on the queue. One last pass in the tree
269 * This case is really rare, which is why we do a second pass at the end
270 * if needed, instead of systematically checking in the queue first
273 return super.doSingularQuery(t
, attributeQuark
);