de65c38fddc72ee2262306db92c7e7cc4da9f5a7
[deliverable/tracecompass.git] / lttng / org.eclipse.tracecompass.lttng2.control.ui / src / org / eclipse / tracecompass / internal / lttng2 / control / ui / relayd / LttngRelaydConsumer.java
1 /**********************************************************************
2 * Copyright (c) 2014, 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
12
13 package org.eclipse.tracecompass.internal.lttng2.control.ui.relayd;
14
15 import java.io.IOException;
16 import java.io.UnsupportedEncodingException;
17 import java.net.Socket;
18 import java.util.List;
19 import java.util.regex.Matcher;
20 import java.util.regex.Pattern;
21
22 import org.eclipse.core.runtime.CoreException;
23 import org.eclipse.core.runtime.IProgressMonitor;
24 import org.eclipse.core.runtime.IStatus;
25 import org.eclipse.core.runtime.Status;
26 import org.eclipse.core.runtime.jobs.Job;
27 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.ILttngRelaydConnector;
28 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.AttachReturnCode;
29 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.AttachSessionResponse;
30 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.CreateSessionResponse;
31 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.CreateSessionReturnCode;
32 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.IndexResponse;
33 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.NextIndexReturnCode;
34 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.SessionResponse;
35 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.StreamResponse;
36 import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.impl.LttngRelaydConnectorFactory;
37 import org.eclipse.tracecompass.internal.lttng2.control.ui.Activator;
38 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
39 import org.eclipse.tracecompass.tmf.core.timestamp.TmfNanoTimestamp;
40 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
41 import org.eclipse.tracecompass.tmf.ctf.core.trace.CtfTmfTrace;
42
43 /**
44 * Consumer of the relay d.
45 *
46 * @author Matthew Khouzam
47 */
48 public final class LttngRelaydConsumer {
49
50 private static final Pattern PROTOCOL_HOST_PATTERN = Pattern.compile("(\\S+://)*(\\d+\\.\\d+\\.\\d+\\.\\d+)"); //$NON-NLS-1$
51 private static final int SIGNAL_THROTTLE_NANOSEC = 10_000_000;
52 private static final String ENCODING_UTF_8 = "UTF-8"; //$NON-NLS-1$
53
54 private Job fConsumerJob;
55 private CtfTmfTrace fCtfTmfTrace;
56 private long fTimestampEnd;
57 private AttachSessionResponse fSession;
58 private Socket fConnection;
59 private ILttngRelaydConnector fRelayd;
60 private String fTracePath;
61 private long fLastSignal = 0;
62 private final LttngRelaydConnectionInfo fConnectionInfo;
63
64 /**
65 * Start a lttng consumer.
66 *
67 * @param address
68 * the ip address in string format
69 * @param port
70 * the port, an integer
71 * @param sessionName
72 * the session name
73 * @param project
74 * the default project
75 */
76 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo) {
77 fConnectionInfo = connectionInfo;
78 fTimestampEnd = 0;
79 }
80
81 /**
82 * Connects to the relayd at the given address and port then attaches to the
83 * given session name.
84 *
85 * @throws CoreException
86 * If something goes wrong during the connection
87 * <ul>
88 * <li>
89 * Connection could not be established (Socket could not be
90 * opened, etc)</li>
91 * <li>
92 * Connection timeout</li>
93 * <li>
94 * The session was not found</li>
95 * <li>
96 * Could not create viewer session</li>
97 * <li>
98 * Invalid trace (no metadata, no streams)</li>
99 * </ul>
100 */
101 public void connect() throws CoreException {
102 if (fConnection != null) {
103 return;
104 }
105
106 try {
107 Matcher matcher = PROTOCOL_HOST_PATTERN.matcher(fConnectionInfo.getHost());
108 String host = null;
109 if (matcher.matches()) {
110 host = matcher.group(2);
111 }
112
113 if (host == null || host.isEmpty()) {
114 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting));
115 }
116
117 fConnection = new Socket(host, fConnectionInfo.getPort());
118 fRelayd = LttngRelaydConnectorFactory.getNewConnector(fConnection);
119 List<SessionResponse> sessions = fRelayd.getSessions();
120 SessionResponse selectedSession = null;
121 for (SessionResponse session : sessions) {
122 String asessionName = nullTerminatedByteArrayToString(session.getSessionName().getBytes());
123
124 if (asessionName.equals(fConnectionInfo.getSessionName())) {
125 selectedSession = session;
126 break;
127 }
128 }
129
130 if (selectedSession == null) {
131 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_SessionNotFound));
132 }
133
134 CreateSessionResponse createSession = fRelayd.createSession();
135 if (createSession.getStatus() != CreateSessionReturnCode.LTTNG_VIEWER_CREATE_SESSION_OK) {
136 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_CreateViewerSessionError + createSession.getStatus().toString()));
137 }
138
139 AttachSessionResponse attachedSession = fRelayd.attachToSession(selectedSession);
140 if (attachedSession.getStatus() != AttachReturnCode.VIEWER_ATTACH_OK) {
141 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_AttachSessionError + attachedSession.getStatus().toString()));
142 }
143
144 String metadata = fRelayd.getMetadata(attachedSession);
145 if (metadata == null) {
146 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoMetadata));
147 }
148
149 List<StreamResponse> attachedStreams = attachedSession.getStreamList();
150 if (attachedStreams.isEmpty()) {
151 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoStreams));
152 }
153
154 fTracePath = nullTerminatedByteArrayToString(attachedStreams.get(0).getPathName().getBytes());
155
156 fSession = attachedSession;
157 } catch (IOException e) {
158 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting + (e.getMessage() != null ? e.getMessage() : ""))); //$NON-NLS-1$
159 }
160 }
161
162 /**
163 * Run the consumer operation for a give trace.
164 *
165 * @param trace
166 * the trace
167 */
168 public void run(final CtfTmfTrace trace) {
169 if (fSession == null) {
170 return;
171 }
172
173 fCtfTmfTrace = trace;
174 fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
175
176 @Override
177 protected IStatus run(final IProgressMonitor monitor) {
178 try {
179 while (!monitor.isCanceled()) {
180 List<StreamResponse> attachedStreams = fSession.getStreamList();
181 for (StreamResponse stream : attachedStreams) {
182 if (stream.getMetadataFlag() != 1) {
183 IndexResponse indexReply = fRelayd.getNextIndex(stream);
184 if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_OK) {
185 long nanoTimeStamp = fCtfTmfTrace.timestampCyclesToNanos(indexReply.getTimestampEnd());
186 if (nanoTimeStamp > fTimestampEnd) {
187 TmfNanoTimestamp endTime = new TmfNanoTimestamp(nanoTimeStamp);
188 TmfTimeRange range = new TmfTimeRange(fCtfTmfTrace.getStartTime(), endTime);
189
190 long currentTime = System.nanoTime();
191 if (currentTime - fLastSignal > SIGNAL_THROTTLE_NANOSEC) {
192 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, range);
193 fCtfTmfTrace.broadcastAsync(signal);
194 fLastSignal = currentTime;
195 }
196 fTimestampEnd = nanoTimeStamp;
197 }
198 } else if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_HUP) {
199 // The trace is now complete because the trace session was destroyed
200 fCtfTmfTrace.setComplete(true);
201 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, new TmfTimeRange(fCtfTmfTrace.getStartTime(), new TmfNanoTimestamp(fTimestampEnd)));
202 fCtfTmfTrace.broadcastAsync(signal);
203 return Status.OK_STATUS;
204 }
205 }
206 }
207 }
208 } catch (IOException e) {
209 Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
210 return new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorLiveReading + (e.getMessage() != null ? e.getMessage() : "")); //$NON-NLS-1$
211 }
212
213 return Status.OK_STATUS;
214 }
215 };
216 fConsumerJob.setSystem(true);
217 fConsumerJob.schedule();
218 }
219
220 /**
221 * Dispose the consumer and it's resources (sockets, etc).
222 */
223 public void dispose() {
224 try {
225 if (fConsumerJob != null) {
226 fConsumerJob.cancel();
227 fConsumerJob.join();
228 }
229 if (fConnection != null) {
230 fConnection.close();
231 }
232 if (fRelayd != null) {
233 fRelayd.close();
234 }
235 } catch (IOException e) {
236 // Ignore
237 } catch (InterruptedException e) {
238 // Ignore
239 }
240 }
241
242 /**
243 * Once the consumer is connected to the relayd session, it knows the trace
244 * path. This can be useful to know exactly where the trace is so that it
245 * can be imported into the workspace and it can be opened.
246 *
247 * @return the trace path
248 */
249 public String getTracePath() {
250 return fTracePath;
251 }
252
253 private static String nullTerminatedByteArrayToString(final byte[] byteArray) throws UnsupportedEncodingException {
254 // Find length of null terminated string
255 int length = 0;
256 while (length < byteArray.length && byteArray[length] != 0) {
257 length++;
258 }
259
260 String asessionName = new String(byteArray, 0, length, ENCODING_UTF_8);
261 return asessionName;
262 }
263
264 }
This page took 0.039 seconds and 4 git commands to generate.