tmf: Rename packages to org.eclipse.tracecompass.*
[deliverable/tracecompass.git] / org.eclipse.tracecompass.lttng2.control.ui / src / org / eclipse / linuxtools / internal / lttng2 / control / ui / relayd / LttngRelaydConsumer.java
CommitLineData
6fd3c6e9
MAL
1/**********************************************************************
2 * Copyright (c) 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
12
13package org.eclipse.linuxtools.internal.lttng2.control.ui.relayd;
14
15import java.io.IOException;
16import java.io.UnsupportedEncodingException;
17import java.net.Socket;
18import java.util.List;
19
20import org.eclipse.core.runtime.CoreException;
21import org.eclipse.core.runtime.IProgressMonitor;
22import org.eclipse.core.runtime.IStatus;
23import org.eclipse.core.runtime.Status;
24import org.eclipse.core.runtime.jobs.Job;
25import org.eclipse.linuxtools.ctf.core.trace.CTFTrace;
26import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.ILttngRelaydConnector;
27import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.LttngRelaydConnectorFactory;
28import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachReturnCode;
29import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachSessionResponse;
30import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionResponse;
31import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionReturnCode;
32import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.IndexResponse;
33import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.NextIndexReturnCode;
34import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.SessionResponse;
35import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.StreamResponse;
36import org.eclipse.linuxtools.internal.lttng2.control.ui.Activator;
2bdf0193
AM
37import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
38import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
39import org.eclipse.tracecompass.tmf.ctf.core.CtfTmfTimestamp;
40import org.eclipse.tracecompass.tmf.ctf.core.CtfTmfTrace;
6fd3c6e9
MAL
41
42/**
43 * Consumer of the relay d.
44 *
45 * @author Matthew Khouzam
46 * @since 3.1
47 */
48public final class LttngRelaydConsumer {
49
50 private static final int SIGNAL_THROTTLE_NANOSEC = 10_000_000;
51 private static final String ENCODING_UTF_8 = "UTF-8"; //$NON-NLS-1$
52
53 private Job fConsumerJob;
54 private CtfTmfTrace fCtfTmfTrace;
55 private CTFTrace fCtfTrace;
56 private long fTimestampEnd;
57 private AttachSessionResponse fSession;
58 private Socket fConnection;
59 private ILttngRelaydConnector fRelayd;
60 private String fTracePath;
61 private long fLastSignal = 0;
62 private final LttngRelaydConnectionInfo fConnectionInfo;
63
64 /**
65 * Start a lttng consumer.
66 *
67 * @param address
68 * the ip address in string format
69 * @param port
70 * the port, an integer
71 * @param sessionName
72 * the session name
73 * @param project
74 * the default project
75 */
76 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo) {
77 fConnectionInfo = connectionInfo;
78 fTimestampEnd = 0;
79 }
80
81 /**
82 * Connects to the relayd at the given address and port then attaches to the
83 * given session name.
84 *
85 * @throws CoreException
86 * If something goes wrong during the connection
87 * <ul>
88 * <li>
89 * Connection could not be established (Socket could not be
90 * opened, etc)</li>
91 * <li>
92 * Connection timeout</li>
93 * <li>
94 * The session was not found</li>
95 * <li>
96 * Could not create viewer session</li>
97 * <li>
98 * Invalid trace (no metadata, no streams)</li>
99 * </ul>
100 */
101 public void connect() throws CoreException {
102 if (fConnection != null) {
103 return;
104 }
105
106 try {
107 fConnection = new Socket(fConnectionInfo.getHost(), fConnectionInfo.getPort());
108 fRelayd = LttngRelaydConnectorFactory.getNewConnector(fConnection);
109 List<SessionResponse> sessions = fRelayd.getSessions();
110 SessionResponse selectedSession = null;
111 for (SessionResponse session : sessions) {
112 String asessionName = nullTerminatedByteArrayToString(session.getSessionName().getBytes());
113
114 if (asessionName.equals(fConnectionInfo.getSessionName())) {
115 selectedSession = session;
116 break;
117 }
118 }
119
120 if (selectedSession == null) {
121 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_SessionNotFound));
122 }
123
124 CreateSessionResponse createSession = fRelayd.createSession();
125 if (createSession.getStatus() != CreateSessionReturnCode.LTTNG_VIEWER_CREATE_SESSION_OK) {
126 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_CreateViewerSessionError + createSession.getStatus().toString()));
127 }
128
129 AttachSessionResponse attachedSession = fRelayd.attachToSession(selectedSession);
130 if (attachedSession.getStatus() != AttachReturnCode.VIEWER_ATTACH_OK) {
131 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_AttachSessionError + attachedSession.getStatus().toString()));
132 }
133
134 String metadata = fRelayd.getMetadata(attachedSession);
135 if (metadata == null) {
136 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoMetadata));
137 }
138
139 List<StreamResponse> attachedStreams = attachedSession.getStreamList();
140 if (attachedStreams.isEmpty()) {
141 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoStreams));
142 }
143
144 fTracePath = nullTerminatedByteArrayToString(attachedStreams.get(0).getPathName().getBytes());
145
146 fSession = attachedSession;
147 } catch (IOException e) {
148 throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting + (e.getMessage() != null ? e.getMessage() : ""))); //$NON-NLS-1$
149 }
150 }
151
152 /**
153 * Run the consumer operation for a give trace.
154 *
155 * @param trace
156 * the trace
157 */
158 public void run(final CtfTmfTrace trace) {
159 if (fSession == null) {
160 return;
161 }
162
163 fCtfTmfTrace = trace;
164 fCtfTrace = trace.getCTFTrace();
165 fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
166
167 @Override
168 protected IStatus run(final IProgressMonitor monitor) {
169 try {
170 while (!monitor.isCanceled()) {
171 List<StreamResponse> attachedStreams = fSession.getStreamList();
172 for (StreamResponse stream : attachedStreams) {
173 if (stream.getMetadataFlag() != 1) {
174 IndexResponse indexReply = fRelayd.getNextIndex(stream);
175 if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_OK) {
176 long nanoTimeStamp = fCtfTrace.timestampCyclesToNanos(indexReply.getTimestampEnd());
177 if (nanoTimeStamp > fTimestampEnd) {
178 CtfTmfTimestamp endTime = new CtfTmfTimestamp(nanoTimeStamp);
179 TmfTimeRange range = new TmfTimeRange(fCtfTmfTrace.getStartTime(), endTime);
180
181 long currentTime = System.nanoTime();
182 if (currentTime - fLastSignal > SIGNAL_THROTTLE_NANOSEC) {
183 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, range);
184 fCtfTmfTrace.broadcastAsync(signal);
185 fLastSignal = currentTime;
186 }
187 fTimestampEnd = nanoTimeStamp;
188 }
189 } else if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_HUP) {
190 // The trace is now complete because the trace session was destroyed
191 fCtfTmfTrace.setComplete(true);
192 TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, new TmfTimeRange(fCtfTmfTrace.getStartTime(), new CtfTmfTimestamp(fTimestampEnd)));
193 fCtfTmfTrace.broadcastAsync(signal);
194 return Status.OK_STATUS;
195 }
196 }
197 }
198 }
199 } catch (IOException e) {
200 Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
201 return new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorLiveReading + (e.getMessage() != null ? e.getMessage() : "")); //$NON-NLS-1$
202 }
203
204 return Status.OK_STATUS;
205 }
206 };
207 fConsumerJob.setSystem(true);
208 fConsumerJob.schedule();
209 }
210
211 /**
212 * Dispose the consumer and it's resources (sockets, etc).
213 */
214 public void dispose() {
215 try {
216 if (fConsumerJob != null) {
217 fConsumerJob.cancel();
218 fConsumerJob.join();
219 }
220 if (fConnection != null) {
221 fConnection.close();
222 }
223 if (fRelayd != null) {
224 fRelayd.close();
225 }
226 } catch (IOException e) {
227 // Ignore
228 } catch (InterruptedException e) {
229 // Ignore
230 }
231 }
232
233 /**
234 * Once the consumer is connected to the relayd session, it knows the trace
235 * path. This can be useful to know exactly where the trace is so that it
236 * can be imported into the workspace and it can be opened.
237 *
238 * @return the trace path
239 */
240 public String getTracePath() {
241 return fTracePath;
242 }
243
244 private static String nullTerminatedByteArrayToString(final byte[] byteArray) throws UnsupportedEncodingException {
245 // Find length of null terminated string
246 int length = 0;
247 while (length < byteArray.length && byteArray[length] != 0) {
248 length++;
249 }
250
251 String asessionName = new String(byteArray, 0, length, ENCODING_UTF_8);
252 return asessionName;
253 }
254
255}
This page took 0.036621 seconds and 5 git commands to generate.