tmf.core: Introduce TmfTimestamp factory methods
[deliverable/tracecompass.git] / lttng / org.eclipse.tracecompass.lttng2.control.ui / src / org / eclipse / tracecompass / internal / lttng2 / control / ui / relayd / LttngRelaydConsumer.java
CommitLineData
6fd3c6e9 1/**********************************************************************
da707390 2 * Copyright (c) 2014, 2015 Ericsson
6fd3c6e9
MAL
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
12
9bc60be7 13package org.eclipse.tracecompass.internal.lttng2.control.ui.relayd;
6fd3c6e9
MAL
14
15import java.io.IOException;
16import java.io.UnsupportedEncodingException;
17import java.net.Socket;
18import java.util.List;
92fe6900
MAL
19import java.util.regex.Matcher;
20import java.util.regex.Pattern;
6fd3c6e9
MAL
21
22import org.eclipse.core.runtime.CoreException;
23import org.eclipse.core.runtime.IProgressMonitor;
24import org.eclipse.core.runtime.IStatus;
25import org.eclipse.core.runtime.Status;
26import org.eclipse.core.runtime.jobs.Job;
9bc60be7 27import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.ILttngRelaydConnector;
41b4bff4
AM
28import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.AttachReturnCode;
29import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.AttachSessionResponse;
30import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.CreateSessionResponse;
31import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.CreateSessionReturnCode;
32import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.IndexResponse;
33import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.NextIndexReturnCode;
34import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.SessionResponse;
35import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.commands.StreamResponse;
a006ee54 36import org.eclipse.tracecompass.internal.lttng2.control.core.relayd.impl.LttngRelaydConnectorFactory;
9bc60be7 37import org.eclipse.tracecompass.internal.lttng2.control.ui.Activator;
2bdf0193 38import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
b2c971ec 39import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
2bdf0193 40import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
b2c971ec 41import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimestamp;
9722e5d7 42import org.eclipse.tracecompass.tmf.ctf.core.trace.CtfTmfTrace;
6fd3c6e9
MAL
43
44/**
45 * Consumer of the relay d.
46 *
47 * @author Matthew Khouzam
6fd3c6e9
MAL
48 */
49public final class LttngRelaydConsumer {
50
92fe6900 51 private static final Pattern PROTOCOL_HOST_PATTERN = Pattern.compile("(\\S+://)*(\\d+\\.\\d+\\.\\d+\\.\\d+)"); //$NON-NLS-1$
6fd3c6e9
MAL
52 private static final int SIGNAL_THROTTLE_NANOSEC = 10_000_000;
53 private static final String ENCODING_UTF_8 = "UTF-8"; //$NON-NLS-1$
54
55 private Job fConsumerJob;
56 private CtfTmfTrace fCtfTmfTrace;
6fd3c6e9
MAL
57 private long fTimestampEnd;
58 private AttachSessionResponse fSession;
59 private Socket fConnection;
60 private ILttngRelaydConnector fRelayd;
61 private String fTracePath;
62 private long fLastSignal = 0;
63 private final LttngRelaydConnectionInfo fConnectionInfo;
64
65 /**
66 * Start a lttng consumer.
67 *
68 * @param address
69 * the ip address in string format
70 * @param port
71 * the port, an integer
72 * @param sessionName
73 * the session name
74 * @param project
75 * the default project
76 */
77 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo) {
78 fConnectionInfo = connectionInfo;
79 fTimestampEnd = 0;
80 }
81
82 /**
83 * Connects to the relayd at the given address and port then attaches to the
84 * given session name.
85 *
86 * @throws CoreException
87 * If something goes wrong during the connection
88 * <ul>
89 * <li>
90 * Connection could not be established (Socket could not be
91 * opened, etc)</li>
92 * <li>
93 * Connection timeout</li>
94 * <li>
95 * The session was not found</li>
96 * <li>
97 * Could not create viewer session</li>
98 * <li>
99 * Invalid trace (no metadata, no streams)</li>
100 * </ul>
101 */
102 public void connect() throws CoreException {
103 if (fConnection != null) {
104 return;
105 }
106
107 try {
92fe6900
MAL
108 Matcher matcher = PROTOCOL_HOST_PATTERN.matcher(fConnectionInfo.getHost());
109 String host = null;
110 if (matcher.matches()) {
111 host = matcher.group(2);
112 }
113
114 if (host == null || host.isEmpty()) {
115 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting));
116 }
117
118 fConnection = new Socket(host, fConnectionInfo.getPort());
6fd3c6e9
MAL
119 fRelayd = LttngRelaydConnectorFactory.getNewConnector(fConnection);
120 List<SessionResponse> sessions = fRelayd.getSessions();
121 SessionResponse selectedSession = null;
122 for (SessionResponse session : sessions) {
123 String asessionName = nullTerminatedByteArrayToString(session.getSessionName().getBytes());
124
125 if (asessionName.equals(fConnectionInfo.getSessionName())) {
126 selectedSession = session;
127 break;
128 }
129 }
130
131 if (selectedSession == null) {
132 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_SessionNotFound));
133 }
134
135 CreateSessionResponse createSession = fRelayd.createSession();
136 if (createSession.getStatus() != CreateSessionReturnCode.LTTNG_VIEWER_CREATE_SESSION_OK) {
137 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_CreateViewerSessionError + createSession.getStatus().toString()));
138 }
139
140 AttachSessionResponse attachedSession = fRelayd.attachToSession(selectedSession);
141 if (attachedSession.getStatus() != AttachReturnCode.VIEWER_ATTACH_OK) {
142 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_AttachSessionError + attachedSession.getStatus().toString()));
143 }
144
145 String metadata = fRelayd.getMetadata(attachedSession);
146 if (metadata == null) {
147 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoMetadata));
148 }
149
150 List<StreamResponse> attachedStreams = attachedSession.getStreamList();
151 if (attachedStreams.isEmpty()) {
152 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoStreams));
153 }
154
155 fTracePath = nullTerminatedByteArrayToString(attachedStreams.get(0).getPathName().getBytes());
156
157 fSession = attachedSession;
158 } catch (IOException e) {
159 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting + (e.getMessage() != null ? e.getMessage() : ""))); //$NON-NLS-1$
160 }
161 }
162
163 /**
164 * Run the consumer operation for a give trace.
165 *
166 * @param trace
167 * the trace
168 */
169 public void run(final CtfTmfTrace trace) {
170 if (fSession == null) {
171 return;
172 }
173
174 fCtfTmfTrace = trace;
6fd3c6e9
MAL
175 fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
176
177 @Override
178 protected IStatus run(final IProgressMonitor monitor) {
179 try {
180 while (!monitor.isCanceled()) {
181 List<StreamResponse> attachedStreams = fSession.getStreamList();
182 for (StreamResponse stream : attachedStreams) {
183 if (stream.getMetadataFlag() != 1) {
184 IndexResponse indexReply = fRelayd.getNextIndex(stream);
185 if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_OK) {
fe71057b 186 long nanoTimeStamp = fCtfTmfTrace.timestampCyclesToNanos(indexReply.getTimestampEnd());
6fd3c6e9 187 if (nanoTimeStamp > fTimestampEnd) {
b2c971ec 188 ITmfTimestamp endTime = TmfTimestamp.fromNanos(nanoTimeStamp);
6fd3c6e9
MAL
189 TmfTimeRange range = new TmfTimeRange(fCtfTmfTrace.getStartTime(), endTime);
190
191 long currentTime = System.nanoTime();
192 if (currentTime - fLastSignal > SIGNAL_THROTTLE_NANOSEC) {
193 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, range);
194 fCtfTmfTrace.broadcastAsync(signal);
195 fLastSignal = currentTime;
196 }
197 fTimestampEnd = nanoTimeStamp;
198 }
199 } else if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_HUP) {
200 // The trace is now complete because the trace session was destroyed
201 fCtfTmfTrace.setComplete(true);
b2c971ec 202 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, new TmfTimeRange(fCtfTmfTrace.getStartTime(), TmfTimestamp.fromNanos(fTimestampEnd)));
6fd3c6e9
MAL
203 fCtfTmfTrace.broadcastAsync(signal);
204 return Status.OK_STATUS;
205 }
206 }
207 }
208 }
209 } catch (IOException e) {
210 Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
211 return new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorLiveReading + (e.getMessage() != null ? e.getMessage() : "")); //$NON-NLS-1$
212 }
213
214 return Status.OK_STATUS;
215 }
216 };
217 fConsumerJob.setSystem(true);
218 fConsumerJob.schedule();
219 }
220
221 /**
222 * Dispose the consumer and it's resources (sockets, etc).
223 */
224 public void dispose() {
225 try {
226 if (fConsumerJob != null) {
227 fConsumerJob.cancel();
228 fConsumerJob.join();
229 }
230 if (fConnection != null) {
231 fConnection.close();
232 }
233 if (fRelayd != null) {
234 fRelayd.close();
235 }
236 } catch (IOException e) {
237 // Ignore
238 } catch (InterruptedException e) {
239 // Ignore
240 }
241 }
242
243 /**
244 * Once the consumer is connected to the relayd session, it knows the trace
245 * path. This can be useful to know exactly where the trace is so that it
246 * can be imported into the workspace and it can be opened.
247 *
248 * @return the trace path
249 */
250 public String getTracePath() {
251 return fTracePath;
252 }
253
254 private static String nullTerminatedByteArrayToString(final byte[] byteArray) throws UnsupportedEncodingException {
255 // Find length of null terminated string
256 int length = 0;
257 while (length < byteArray.length && byteArray[length] != 0) {
258 length++;
259 }
260
261 String asessionName = new String(byteArray, 0, length, ENCODING_UTF_8);
262 return asessionName;
263 }
264
265}
This page took 0.081588 seconds and 5 git commands to generate.