tmf.ctf: Split the classes into proper packages
[deliverable/tracecompass.git] / org.eclipse.tracecompass.lttng2.control.ui / src / org / eclipse / tracecompass / internal / lttng2 / control / ui / relayd / LttngRelaydConsumer.java
1 /**********************************************************************
2 * Copyright (c) 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
12
13 package org.eclipse.tracecompass.internal.lttng2.control.ui.relayd;
14
15 import java.io.IOException;
16 import java.io.UnsupportedEncodingException;
17 import java.net.Socket;
18 import java.util.List;
19 import java.util.regex.Matcher;
20 import java.util.regex.Pattern;
21
22 import org.eclipse.core.runtime.CoreException;
23 import org.eclipse.core.runtime.IProgressMonitor;
24 import org.eclipse.core.runtime.IStatus;
25 import org.eclipse.core.runtime.Status;
26 import org.eclipse.core.runtime.jobs.Job;
27 import org.eclipse.tracecompass.ctf.core.trace.CTFTrace;
28 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.ILttngRelaydConnector;
29 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.LttngRelaydConnectorFactory;
30 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachReturnCode;
31 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachSessionResponse;
32 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionResponse;
33 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionReturnCode;
34 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.IndexResponse;
35 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.NextIndexReturnCode;
36 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.SessionResponse;
37 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.lttngviewerCommands.StreamResponse;
38 import org.eclipse.tracecompass.internal.lttng2.control.ui.Activator;
39 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
40 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
41 import org.eclipse.tracecompass.tmf.ctf.core.timestamp.CtfTmfTimestamp;
42 import org.eclipse.tracecompass.tmf.ctf.core.trace.CtfTmfTrace;
43
44 /**
45 * Consumer of the relay d.
46 *
47 * @author Matthew Khouzam
48 * @since 3.1
49 */
50 public final class LttngRelaydConsumer {
51
52 private static final Pattern PROTOCOL_HOST_PATTERN = Pattern.compile("(\\S+://)*(\\d+\\.\\d+\\.\\d+\\.\\d+)"); //$NON-NLS-1$
53 private static final int SIGNAL_THROTTLE_NANOSEC = 10_000_000;
54 private static final String ENCODING_UTF_8 = "UTF-8"; //$NON-NLS-1$
55
56 private Job fConsumerJob;
57 private CtfTmfTrace fCtfTmfTrace;
58 private CTFTrace fCtfTrace;
59 private long fTimestampEnd;
60 private AttachSessionResponse fSession;
61 private Socket fConnection;
62 private ILttngRelaydConnector fRelayd;
63 private String fTracePath;
64 private long fLastSignal = 0;
65 private final LttngRelaydConnectionInfo fConnectionInfo;
66
67 /**
68 * Start a lttng consumer.
69 *
70 * @param address
71 * the ip address in string format
72 * @param port
73 * the port, an integer
74 * @param sessionName
75 * the session name
76 * @param project
77 * the default project
78 */
79 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo) {
80 fConnectionInfo = connectionInfo;
81 fTimestampEnd = 0;
82 }
83
84 /**
85 * Connects to the relayd at the given address and port then attaches to the
86 * given session name.
87 *
88 * @throws CoreException
89 * If something goes wrong during the connection
90 * <ul>
91 * <li>
92 * Connection could not be established (Socket could not be
93 * opened, etc)</li>
94 * <li>
95 * Connection timeout</li>
96 * <li>
97 * The session was not found</li>
98 * <li>
99 * Could not create viewer session</li>
100 * <li>
101 * Invalid trace (no metadata, no streams)</li>
102 * </ul>
103 */
104 public void connect() throws CoreException {
105 if (fConnection != null) {
106 return;
107 }
108
109 try {
110 Matcher matcher = PROTOCOL_HOST_PATTERN.matcher(fConnectionInfo.getHost());
111 String host = null;
112 if (matcher.matches()) {
113 host = matcher.group(2);
114 }
115
116 if (host == null || host.isEmpty()) {
117 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting));
118 }
119
120 fConnection = new Socket(host, fConnectionInfo.getPort());
121 fRelayd = LttngRelaydConnectorFactory.getNewConnector(fConnection);
122 List<SessionResponse> sessions = fRelayd.getSessions();
123 SessionResponse selectedSession = null;
124 for (SessionResponse session : sessions) {
125 String asessionName = nullTerminatedByteArrayToString(session.getSessionName().getBytes());
126
127 if (asessionName.equals(fConnectionInfo.getSessionName())) {
128 selectedSession = session;
129 break;
130 }
131 }
132
133 if (selectedSession == null) {
134 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_SessionNotFound));
135 }
136
137 CreateSessionResponse createSession = fRelayd.createSession();
138 if (createSession.getStatus() != CreateSessionReturnCode.LTTNG_VIEWER_CREATE_SESSION_OK) {
139 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_CreateViewerSessionError + createSession.getStatus().toString()));
140 }
141
142 AttachSessionResponse attachedSession = fRelayd.attachToSession(selectedSession);
143 if (attachedSession.getStatus() != AttachReturnCode.VIEWER_ATTACH_OK) {
144 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_AttachSessionError + attachedSession.getStatus().toString()));
145 }
146
147 String metadata = fRelayd.getMetadata(attachedSession);
148 if (metadata == null) {
149 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoMetadata));
150 }
151
152 List<StreamResponse> attachedStreams = attachedSession.getStreamList();
153 if (attachedStreams.isEmpty()) {
154 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoStreams));
155 }
156
157 fTracePath = nullTerminatedByteArrayToString(attachedStreams.get(0).getPathName().getBytes());
158
159 fSession = attachedSession;
160 } catch (IOException e) {
161 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting + (e.getMessage() != null ? e.getMessage() : ""))); //$NON-NLS-1$
162 }
163 }
164
165 /**
166 * Run the consumer operation for a give trace.
167 *
168 * @param trace
169 * the trace
170 */
171 public void run(final CtfTmfTrace trace) {
172 if (fSession == null) {
173 return;
174 }
175
176 fCtfTmfTrace = trace;
177 fCtfTrace = trace.getCTFTrace();
178 fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
179
180 @Override
181 protected IStatus run(final IProgressMonitor monitor) {
182 try {
183 while (!monitor.isCanceled()) {
184 List<StreamResponse> attachedStreams = fSession.getStreamList();
185 for (StreamResponse stream : attachedStreams) {
186 if (stream.getMetadataFlag() != 1) {
187 IndexResponse indexReply = fRelayd.getNextIndex(stream);
188 if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_OK) {
189 long nanoTimeStamp = fCtfTrace.timestampCyclesToNanos(indexReply.getTimestampEnd());
190 if (nanoTimeStamp > fTimestampEnd) {
191 CtfTmfTimestamp endTime = new CtfTmfTimestamp(nanoTimeStamp);
192 TmfTimeRange range = new TmfTimeRange(fCtfTmfTrace.getStartTime(), endTime);
193
194 long currentTime = System.nanoTime();
195 if (currentTime - fLastSignal > SIGNAL_THROTTLE_NANOSEC) {
196 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, range);
197 fCtfTmfTrace.broadcastAsync(signal);
198 fLastSignal = currentTime;
199 }
200 fTimestampEnd = nanoTimeStamp;
201 }
202 } else if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_HUP) {
203 // The trace is now complete because the trace session was destroyed
204 fCtfTmfTrace.setComplete(true);
205 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, new TmfTimeRange(fCtfTmfTrace.getStartTime(), new CtfTmfTimestamp(fTimestampEnd)));
206 fCtfTmfTrace.broadcastAsync(signal);
207 return Status.OK_STATUS;
208 }
209 }
210 }
211 }
212 } catch (IOException e) {
213 Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
214 return new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorLiveReading + (e.getMessage() != null ? e.getMessage() : "")); //$NON-NLS-1$
215 }
216
217 return Status.OK_STATUS;
218 }
219 };
220 fConsumerJob.setSystem(true);
221 fConsumerJob.schedule();
222 }
223
224 /**
225 * Dispose the consumer and it's resources (sockets, etc).
226 */
227 public void dispose() {
228 try {
229 if (fConsumerJob != null) {
230 fConsumerJob.cancel();
231 fConsumerJob.join();
232 }
233 if (fConnection != null) {
234 fConnection.close();
235 }
236 if (fRelayd != null) {
237 fRelayd.close();
238 }
239 } catch (IOException e) {
240 // Ignore
241 } catch (InterruptedException e) {
242 // Ignore
243 }
244 }
245
246 /**
247 * Once the consumer is connected to the relayd session, it knows the trace
248 * path. This can be useful to know exactly where the trace is so that it
249 * can be imported into the workspace and it can be opened.
250 *
251 * @return the trace path
252 */
253 public String getTracePath() {
254 return fTracePath;
255 }
256
257 private static String nullTerminatedByteArrayToString(final byte[] byteArray) throws UnsupportedEncodingException {
258 // Find length of null terminated string
259 int length = 0;
260 while (length < byteArray.length && byteArray[length] != 0) {
261 length++;
262 }
263
264 String asessionName = new String(byteArray, 0, length, ENCODING_UTF_8);
265 return asessionName;
266 }
267
268 }
This page took 0.040171 seconds and 5 git commands to generate.