1 /**********************************************************************
2 * Copyright (c) 2014 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
13 package org
.eclipse
.tracecompass
.internal
.lttng2
.control
.ui
.relayd
;
15 import java
.io
.IOException
;
16 import java
.io
.UnsupportedEncodingException
;
17 import java
.net
.Socket
;
18 import java
.util
.List
;
20 import org
.eclipse
.core
.runtime
.CoreException
;
21 import org
.eclipse
.core
.runtime
.IProgressMonitor
;
22 import org
.eclipse
.core
.runtime
.IStatus
;
23 import org
.eclipse
.core
.runtime
.Status
;
24 import org
.eclipse
.core
.runtime
.jobs
.Job
;
25 import org
.eclipse
.linuxtools
.ctf
.core
.trace
.CTFTrace
;
26 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.ILttngRelaydConnector
;
27 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.LttngRelaydConnectorFactory
;
28 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.AttachReturnCode
;
29 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.AttachSessionResponse
;
30 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.CreateSessionResponse
;
31 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.CreateSessionReturnCode
;
32 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.IndexResponse
;
33 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.NextIndexReturnCode
;
34 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.SessionResponse
;
35 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.core
.relayd
.lttngviewerCommands
.StreamResponse
;
36 import org
.eclipse
.tracecompass
.internal
.lttng2
.control
.ui
.Activator
;
37 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfTraceRangeUpdatedSignal
;
38 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.TmfTimeRange
;
39 import org
.eclipse
.tracecompass
.tmf
.ctf
.core
.CtfTmfTimestamp
;
40 import org
.eclipse
.tracecompass
.tmf
.ctf
.core
.CtfTmfTrace
;
43 * Consumer of the relay d.
45 * @author Matthew Khouzam
48 public final class LttngRelaydConsumer
{
50 private static final int SIGNAL_THROTTLE_NANOSEC
= 10_000_000
;
51 private static final String ENCODING_UTF_8
= "UTF-8"; //$NON-NLS-1$
53 private Job fConsumerJob
;
54 private CtfTmfTrace fCtfTmfTrace
;
55 private CTFTrace fCtfTrace
;
56 private long fTimestampEnd
;
57 private AttachSessionResponse fSession
;
58 private Socket fConnection
;
59 private ILttngRelaydConnector fRelayd
;
60 private String fTracePath
;
61 private long fLastSignal
= 0;
62 private final LttngRelaydConnectionInfo fConnectionInfo
;
65 * Start a lttng consumer.
68 * the ip address in string format
70 * the port, an integer
76 LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo
) {
77 fConnectionInfo
= connectionInfo
;
82 * Connects to the relayd at the given address and port then attaches to the
85 * @throws CoreException
86 * If something goes wrong during the connection
89 * Connection could not be established (Socket could not be
92 * Connection timeout</li>
94 * The session was not found</li>
96 * Could not create viewer session</li>
98 * Invalid trace (no metadata, no streams)</li>
101 public void connect() throws CoreException
{
102 if (fConnection
!= null) {
107 fConnection
= new Socket(fConnectionInfo
.getHost(), fConnectionInfo
.getPort());
108 fRelayd
= LttngRelaydConnectorFactory
.getNewConnector(fConnection
);
109 List
<SessionResponse
> sessions
= fRelayd
.getSessions();
110 SessionResponse selectedSession
= null;
111 for (SessionResponse session
: sessions
) {
112 String asessionName
= nullTerminatedByteArrayToString(session
.getSessionName().getBytes());
114 if (asessionName
.equals(fConnectionInfo
.getSessionName())) {
115 selectedSession
= session
;
120 if (selectedSession
== null) {
121 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_SessionNotFound
));
124 CreateSessionResponse createSession
= fRelayd
.createSession();
125 if (createSession
.getStatus() != CreateSessionReturnCode
.LTTNG_VIEWER_CREATE_SESSION_OK
) {
126 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_CreateViewerSessionError
+ createSession
.getStatus().toString()));
129 AttachSessionResponse attachedSession
= fRelayd
.attachToSession(selectedSession
);
130 if (attachedSession
.getStatus() != AttachReturnCode
.VIEWER_ATTACH_OK
) {
131 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_AttachSessionError
+ attachedSession
.getStatus().toString()));
134 String metadata
= fRelayd
.getMetadata(attachedSession
);
135 if (metadata
== null) {
136 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_NoMetadata
));
139 List
<StreamResponse
> attachedStreams
= attachedSession
.getStreamList();
140 if (attachedStreams
.isEmpty()) {
141 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_NoStreams
));
144 fTracePath
= nullTerminatedByteArrayToString(attachedStreams
.get(0).getPathName().getBytes());
146 fSession
= attachedSession
;
147 } catch (IOException e
) {
148 throw new CoreException(new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorConnecting
+ (e
.getMessage() != null ? e
.getMessage() : ""))); //$NON-NLS-1$
153 * Run the consumer operation for a give trace.
158 public void run(final CtfTmfTrace trace
) {
159 if (fSession
== null) {
163 fCtfTmfTrace
= trace
;
164 fCtfTrace
= trace
.getCTFTrace();
165 fConsumerJob
= new Job("RelayD consumer") { //$NON-NLS-1$
168 protected IStatus
run(final IProgressMonitor monitor
) {
170 while (!monitor
.isCanceled()) {
171 List
<StreamResponse
> attachedStreams
= fSession
.getStreamList();
172 for (StreamResponse stream
: attachedStreams
) {
173 if (stream
.getMetadataFlag() != 1) {
174 IndexResponse indexReply
= fRelayd
.getNextIndex(stream
);
175 if (indexReply
.getStatus() == NextIndexReturnCode
.VIEWER_INDEX_OK
) {
176 long nanoTimeStamp
= fCtfTrace
.timestampCyclesToNanos(indexReply
.getTimestampEnd());
177 if (nanoTimeStamp
> fTimestampEnd
) {
178 CtfTmfTimestamp endTime
= new CtfTmfTimestamp(nanoTimeStamp
);
179 TmfTimeRange range
= new TmfTimeRange(fCtfTmfTrace
.getStartTime(), endTime
);
181 long currentTime
= System
.nanoTime();
182 if (currentTime
- fLastSignal
> SIGNAL_THROTTLE_NANOSEC
) {
183 TmfTraceRangeUpdatedSignal signal
= new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer
.this, fCtfTmfTrace
, range
);
184 fCtfTmfTrace
.broadcastAsync(signal
);
185 fLastSignal
= currentTime
;
187 fTimestampEnd
= nanoTimeStamp
;
189 } else if (indexReply
.getStatus() == NextIndexReturnCode
.VIEWER_INDEX_HUP
) {
190 // The trace is now complete because the trace session was destroyed
191 fCtfTmfTrace
.setComplete(true);
192 TmfTraceRangeUpdatedSignal signal
= new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer
.this, fCtfTmfTrace
, new TmfTimeRange(fCtfTmfTrace
.getStartTime(), new CtfTmfTimestamp(fTimestampEnd
)));
193 fCtfTmfTrace
.broadcastAsync(signal
);
194 return Status
.OK_STATUS
;
199 } catch (IOException e
) {
200 Activator
.getDefault().logError("Error during live trace reading", e
); //$NON-NLS-1$
201 return new Status(IStatus
.ERROR
, Activator
.PLUGIN_ID
, Messages
.LttngRelaydConsumer_ErrorLiveReading
+ (e
.getMessage() != null ? e
.getMessage() : "")); //$NON-NLS-1$
204 return Status
.OK_STATUS
;
207 fConsumerJob
.setSystem(true);
208 fConsumerJob
.schedule();
212 * Dispose the consumer and it's resources (sockets, etc).
214 public void dispose() {
216 if (fConsumerJob
!= null) {
217 fConsumerJob
.cancel();
220 if (fConnection
!= null) {
223 if (fRelayd
!= null) {
226 } catch (IOException e
) {
228 } catch (InterruptedException e
) {
234 * Once the consumer is connected to the relayd session, it knows the trace
235 * path. This can be useful to know exactly where the trace is so that it
236 * can be imported into the workspace and it can be opened.
238 * @return the trace path
240 public String
getTracePath() {
244 private static String
nullTerminatedByteArrayToString(final byte[] byteArray
) throws UnsupportedEncodingException
{
245 // Find length of null terminated string
247 while (length
< byteArray
.length
&& byteArray
[length
] != 0) {
251 String asessionName
= new String(byteArray
, 0, length
, ENCODING_UTF_8
);