1 /**********************************************************************
2 * Copyright (c) 2014 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
13 package org
.eclipse
.tracecompass
.internal
.lttng2
.control
.ui
.relayd
;
15 import java
.io
.IOException
;
16 import java
.io
.UnsupportedEncodingException
;
17 import java
.net
.Socket
;
18 import java
.util
.List
;
19 import java
.util
.regex
.Matcher
;
20 import java
.util
.regex
.Pattern
;
22 import org
.eclipse
.core
.runtime
.CoreException
;
23 import org
.eclipse
.core
.runtime
.IProgressMonitor
;
24 import org
.eclipse
.core
.runtime
.IStatus
;
25 import org
.eclipse
.core
.runtime
.Status
;
26 import org
.eclipse
.core
.runtime
.jobs
.Job
;
27 import org
.eclipse
.tracecompass
.ctf
.core
.trace
.CTFTrace
;
28 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.ILttngRelaydConnector
;
29 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.LttngRelaydConnectorFactory
;
30 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.AttachReturnCode
;
31 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.AttachSessionResponse
;
32 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.CreateSessionResponse
;
33 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.CreateSessionReturnCode
;
34 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.IndexResponse
;
35 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.NextIndexReturnCode
;
36 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.SessionResponse
;
37 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.StreamResponse
;
38 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.ui
.Activator
;
39 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfTraceRangeUpdatedSignal
;
40 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.TmfTimeRange
;
41 import org
.eclipse
.tracecompass
.tmf
.ctf
.core
.CtfTmfTimestamp
;
42 import org
.eclipse
.tracecompass
.tmf
.ctf
.core
.CtfTmfTrace
;
45 * Consumer of the relay d.
47 * @author Matthew Khouzam
50 public final class LttngRelaydConsumer
{
52 private static final Pattern PROTOCOL_HOST_PATTERN
= Pattern
.compile("(\\S+://)*(\\d+\\.\\d+\\.\\d+\\.\\d+)"); //$NON-NLS-1$
53 private static final int SIGNAL_THROTTLE_NANOSEC
= 10_000_000
;
54 private static final String ENCODING_UTF_8
= "UTF-8"; //$NON-NLS-1$
56 private Job fConsumerJob
;
57 private CtfTmfTrace fCtfTmfTrace
;
58 private CTFTrace fCtfTrace
;
59 private long fTimestampEnd
;
60 private AttachSessionResponse fSession
;
61 private Socket fConnection
;
62 private ILttngRelaydConnector fRelayd
;
63 private String fTracePath
;
64 private long fLastSignal
= 0;
65 private final LttngRelaydConnectionInfo fConnectionInfo
;
68 * Start a lttng consumer.
71 * the ip address in string format
73 * the port, an integer
79 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo
) {
80 fConnectionInfo
= connectionInfo
;
85 * Connects to the relayd at the given address and port then attaches to the
88 * @throws CoreException
89 * If something goes wrong during the connection
92 * Connection could not be established (Socket could not be
95 * Connection timeout</li>
97 * The session was not found</li>
99 * Could not create viewer session</li>
101 * Invalid trace (no metadata, no streams)</li>
104 public void connect() throws CoreException
{
105 if (fConnection
!= null) {
110 Matcher matcher
= PROTOCOL_HOST_PATTERN
.matcher(fConnectionInfo
.getHost());
112 if (matcher
.matches()) {
113 host
= matcher
.group(2);
116 if (host
== null || host
.isEmpty()) {
117 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorConnecting
));
120 fConnection
= new Socket(host
, fConnectionInfo
.getPort());
121 fRelayd
= LttngRelaydConnectorFactory
.getNewConnector(fConnection
);
122 List
<SessionResponse
> sessions
= fRelayd
.getSessions();
123 SessionResponse selectedSession
= null;
124 for (SessionResponse session
: sessions
) {
125 String asessionName
= nullTerminatedByteArrayToString(session
.getSessionName().getBytes());
127 if (asessionName
.equals(fConnectionInfo
.getSessionName())) {
128 selectedSession
= session
;
133 if (selectedSession
== null) {
134 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_SessionNotFound
));
137 CreateSessionResponse createSession
= fRelayd
.createSession();
138 if (createSession
.getStatus() != CreateSessionReturnCode
.LTTNG_VIEWER_CREATE_SESSION_OK
) {
139 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_CreateViewerSessionError
+ createSession
.getStatus().toString()));
142 AttachSessionResponse attachedSession
= fRelayd
.attachToSession(selectedSession
);
143 if (attachedSession
.getStatus() != AttachReturnCode
.VIEWER_ATTACH_OK
) {
144 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_AttachSessionError
+ attachedSession
.getStatus().toString()));
147 String metadata
= fRelayd
.getMetadata(attachedSession
);
148 if (metadata
== null) {
149 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_NoMetadata
));
152 List
<StreamResponse
> attachedStreams
= attachedSession
.getStreamList();
153 if (attachedStreams
.isEmpty()) {
154 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_NoStreams
));
157 fTracePath
= nullTerminatedByteArrayToString(attachedStreams
.get(0).getPathName().getBytes());
159 fSession
= attachedSession
;
160 } catch (IOException e
) {
161 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorConnecting
+ (e
.getMessage() != null ? e
.getMessage() : ""))); //$NON-NLS-1$
166 * Run the consumer operation for a give trace.
171 public void run(final CtfTmfTrace trace
) {
172 if (fSession
== null) {
176 fCtfTmfTrace
= trace
;
177 fCtfTrace
= trace
.getCTFTrace();
178 fConsumerJob
= new Job("RelayD consumer") { //$NON-NLS-1$
181 protected IStatus
run(final IProgressMonitor monitor
) {
183 while (!monitor
.isCanceled()) {
184 List
<StreamResponse
> attachedStreams
= fSession
.getStreamList();
185 for (StreamResponse stream
: attachedStreams
) {
186 if (stream
.getMetadataFlag() != 1) {
187 IndexResponse indexReply
= fRelayd
.getNextIndex(stream
);
188 if (indexReply
.getStatus() == NextIndexReturnCode
.VIEWER_INDEX_OK
) {
189 long nanoTimeStamp
= fCtfTrace
.timestampCyclesToNanos(indexReply
.getTimestampEnd());
190 if (nanoTimeStamp
> fTimestampEnd
) {
191 CtfTmfTimestamp endTime
= new CtfTmfTimestamp(nanoTimeStamp
);
192 TmfTimeRange range
= new TmfTimeRange(fCtfTmfTrace
.getStartTime(), endTime
);
194 long currentTime
= System
.nanoTime();
195 if (currentTime
- fLastSignal
> SIGNAL_THROTTLE_NANOSEC
) {
196 TmfTraceRangeUpdatedSignal signal
= new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer
.this, fCtfTmfTrace
, range
);
197 fCtfTmfTrace
.broadcastAsync(signal
);
198 fLastSignal
= currentTime
;
200 fTimestampEnd
= nanoTimeStamp
;
202 } else if (indexReply
.getStatus() == NextIndexReturnCode
.VIEWER_INDEX_HUP
) {
203 // The trace is now complete because the trace session was destroyed
204 fCtfTmfTrace
.setComplete(true);
205 TmfTraceRangeUpdatedSignal signal
= new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer
.this, fCtfTmfTrace
, new TmfTimeRange(fCtfTmfTrace
.getStartTime(), new CtfTmfTimestamp(fTimestampEnd
)));
206 fCtfTmfTrace
.broadcastAsync(signal
);
207 return Status
.OK_STATUS
;
212 } catch (IOException e
) {
213 Activator
.getDefault().logError("Error during live trace reading", e
); //$NON-NLS-1$
214 return new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorLiveReading
+ (e
.getMessage() != null ? e
.getMessage() : "")); //$NON-NLS-1$
217 return Status
.OK_STATUS
;
220 fConsumerJob
.setSystem(true);
221 fConsumerJob
.schedule();
225 * Dispose the consumer and it's resources (sockets, etc).
227 public void dispose() {
229 if (fConsumerJob
!= null) {
230 fConsumerJob
.cancel();
233 if (fConnection
!= null) {
236 if (fRelayd
!= null) {
239 } catch (IOException e
) {
241 } catch (InterruptedException e
) {
247 * Once the consumer is connected to the relayd session, it knows the trace
248 * path. This can be useful to know exactly where the trace is so that it
249 * can be imported into the workspace and it can be opened.
251 * @return the trace path
253 public String
getTracePath() {
257 private static String
nullTerminatedByteArrayToString(final byte[] byteArray
) throws UnsupportedEncodingException
{
258 // Find length of null terminated string
260 while (length
< byteArray
.length
&& byteArray
[length
] != 0) {
264 String asessionName
= new String(byteArray
, 0, length
, ENCODING_UTF_8
);