1 /**********************************************************************
2 * Copyright (c) 2014, 2015 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
13 package org
.eclipse
.tracecompass
.internal
.lttng2
.control
.ui
.relayd
;
15 import java
.io
.IOException
;
16 import java
.io
.UnsupportedEncodingException
;
17 import java
.net
.Socket
;
18 import java
.util
.List
;
19 import java
.util
.regex
.Matcher
;
20 import java
.util
.regex
.Pattern
;
22 import org
.eclipse
.core
.runtime
.CoreException
;
23 import org
.eclipse
.core
.runtime
.IProgressMonitor
;
24 import org
.eclipse
.core
.runtime
.IStatus
;
25 import org
.eclipse
.core
.runtime
.Status
;
26 import org
.eclipse
.core
.runtime
.jobs
.Job
;
27 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.ILttngRelaydConnector
;
28 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.AttachReturnCode
;
29 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.AttachSessionResponse
;
30 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.CreateSessionResponse
;
31 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.CreateSessionReturnCode
;
32 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.IndexResponse
;
33 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.NextIndexReturnCode
;
34 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.SessionResponse
;
35 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.commands
.StreamResponse
;
36 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.impl
.LttngRelaydConnectorFactory
;
37 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.ui
.Activator
;
38 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfTraceRangeUpdatedSignal
;
39 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.TmfNanoTimestamp
;
40 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.TmfTimeRange
;
41 import org
.eclipse
.tracecompass
.tmf
.ctf
.core
.trace
.CtfTmfTrace
;
44 * Consumer of the relay d.
46 * @author Matthew Khouzam
48 public final class LttngRelaydConsumer
{
50 private static final Pattern PROTOCOL_HOST_PATTERN
= Pattern
.compile("(\\S+://)*(\\d+\\.\\d+\\.\\d+\\.\\d+)"); //$NON-NLS-1$
51 private static final int SIGNAL_THROTTLE_NANOSEC
= 10_000_000
;
52 private static final String ENCODING_UTF_8
= "UTF-8"; //$NON-NLS-1$
54 private Job fConsumerJob
;
55 private CtfTmfTrace fCtfTmfTrace
;
56 private long fTimestampEnd
;
57 private AttachSessionResponse fSession
;
58 private Socket fConnection
;
59 private ILttngRelaydConnector fRelayd
;
60 private String fTracePath
;
61 private long fLastSignal
= 0;
62 private final LttngRelaydConnectionInfo fConnectionInfo
;
65 * Start a lttng consumer.
68 * the ip address in string format
70 * the port, an integer
76 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo
) {
77 fConnectionInfo
= connectionInfo
;
82 * Connects to the relayd at the given address and port then attaches to the
85 * @throws CoreException
86 * If something goes wrong during the connection
89 * Connection could not be established (Socket could not be
92 * Connection timeout</li>
94 * The session was not found</li>
96 * Could not create viewer session</li>
98 * Invalid trace (no metadata, no streams)</li>
101 public void connect() throws CoreException
{
102 if (fConnection
!= null) {
107 Matcher matcher
= PROTOCOL_HOST_PATTERN
.matcher(fConnectionInfo
.getHost());
109 if (matcher
.matches()) {
110 host
= matcher
.group(2);
113 if (host
== null || host
.isEmpty()) {
114 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorConnecting
));
117 fConnection
= new Socket(host
, fConnectionInfo
.getPort());
118 fRelayd
= LttngRelaydConnectorFactory
.getNewConnector(fConnection
);
119 List
<SessionResponse
> sessions
= fRelayd
.getSessions();
120 SessionResponse selectedSession
= null;
121 for (SessionResponse session
: sessions
) {
122 String asessionName
= nullTerminatedByteArrayToString(session
.getSessionName().getBytes());
124 if (asessionName
.equals(fConnectionInfo
.getSessionName())) {
125 selectedSession
= session
;
130 if (selectedSession
== null) {
131 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_SessionNotFound
));
134 CreateSessionResponse createSession
= fRelayd
.createSession();
135 if (createSession
.getStatus() != CreateSessionReturnCode
.LTTNG_VIEWER_CREATE_SESSION_OK
) {
136 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_CreateViewerSessionError
+ createSession
.getStatus().toString()));
139 AttachSessionResponse attachedSession
= fRelayd
.attachToSession(selectedSession
);
140 if (attachedSession
.getStatus() != AttachReturnCode
.VIEWER_ATTACH_OK
) {
141 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_AttachSessionError
+ attachedSession
.getStatus().toString()));
144 String metadata
= fRelayd
.getMetadata(attachedSession
);
145 if (metadata
== null) {
146 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_NoMetadata
));
149 List
<StreamResponse
> attachedStreams
= attachedSession
.getStreamList();
150 if (attachedStreams
.isEmpty()) {
151 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_NoStreams
));
154 fTracePath
= nullTerminatedByteArrayToString(attachedStreams
.get(0).getPathName().getBytes());
156 fSession
= attachedSession
;
157 } catch (IOException e
) {
158 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorConnecting
+ (e
.getMessage() != null ? e
.getMessage() : ""))); //$NON-NLS-1$
163 * Run the consumer operation for a give trace.
168 public void run(final CtfTmfTrace trace
) {
169 if (fSession
== null) {
173 fCtfTmfTrace
= trace
;
174 fConsumerJob
= new Job("RelayD consumer") { //$NON-NLS-1$
177 protected IStatus
run(final IProgressMonitor monitor
) {
179 while (!monitor
.isCanceled()) {
180 List
<StreamResponse
> attachedStreams
= fSession
.getStreamList();
181 for (StreamResponse stream
: attachedStreams
) {
182 if (stream
.getMetadataFlag() != 1) {
183 IndexResponse indexReply
= fRelayd
.getNextIndex(stream
);
184 if (indexReply
.getStatus() == NextIndexReturnCode
.VIEWER_INDEX_OK
) {
185 long nanoTimeStamp
= fCtfTmfTrace
.timestampCyclesToNanos(indexReply
.getTimestampEnd());
186 if (nanoTimeStamp
> fTimestampEnd
) {
187 TmfNanoTimestamp endTime
= new TmfNanoTimestamp(nanoTimeStamp
);
188 TmfTimeRange range
= new TmfTimeRange(fCtfTmfTrace
.getStartTime(), endTime
);
190 long currentTime
= System
.nanoTime();
191 if (currentTime
- fLastSignal
> SIGNAL_THROTTLE_NANOSEC
) {
192 TmfTraceRangeUpdatedSignal signal
= new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer
.this, fCtfTmfTrace
, range
);
193 fCtfTmfTrace
.broadcastAsync(signal
);
194 fLastSignal
= currentTime
;
196 fTimestampEnd
= nanoTimeStamp
;
198 } else if (indexReply
.getStatus() == NextIndexReturnCode
.VIEWER_INDEX_HUP
) {
199 // The trace is now complete because the trace session was destroyed
200 fCtfTmfTrace
.setComplete(true);
201 TmfTraceRangeUpdatedSignal signal
= new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer
.this, fCtfTmfTrace
, new TmfTimeRange(fCtfTmfTrace
.getStartTime(), new TmfNanoTimestamp(fTimestampEnd
)));
202 fCtfTmfTrace
.broadcastAsync(signal
);
203 return Status
.OK_STATUS
;
208 } catch (IOException e
) {
209 Activator
.getDefault().logError("Error during live trace reading", e
); //$NON-NLS-1$
210 return new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorLiveReading
+ (e
.getMessage() != null ? e
.getMessage() : "")); //$NON-NLS-1$
213 return Status
.OK_STATUS
;
216 fConsumerJob
.setSystem(true);
217 fConsumerJob
.schedule();
221 * Dispose the consumer and it's resources (sockets, etc).
223 public void dispose() {
225 if (fConsumerJob
!= null) {
226 fConsumerJob
.cancel();
229 if (fConnection
!= null) {
232 if (fRelayd
!= null) {
235 } catch (IOException e
) {
237 } catch (InterruptedException e
) {
243 * Once the consumer is connected to the relayd session, it knows the trace
244 * path. This can be useful to know exactly where the trace is so that it
245 * can be imported into the workspace and it can be opened.
247 * @return the trace path
249 public String
getTracePath() {
253 private static String
nullTerminatedByteArrayToString(final byte[] byteArray
) throws UnsupportedEncodingException
{
254 // Find length of null terminated string
256 while (length
< byteArray
.length
&& byteArray
[length
] != 0) {
260 String asessionName
= new String(byteArray
, 0, length
, ENCODING_UTF_8
);