static
int make_viewer_streams(struct relay_session *session,
enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent,
- uint32_t *nb_created)
+ uint32_t *nb_created, bool *closed)
{
int ret;
struct lttng_ht_iter iter;
*/
pthread_mutex_lock(&session->lock);
+ if (session->connection_closed) {
+ *closed = true;
+ }
+
/*
* Create viewer streams for relay streams that are ready to be
* used for a the given session id only.
if (!stream->published) {
goto next;
}
+ /*
+ * Stream has no data, don't consider it yet.
+ */
+ if (stream->is_metadata) {
+ if (!stream->metadata_received) {
+ goto next;
+ }
+ } else {
+ if (stream->prev_seq == -1ULL) {
+ goto next;
+ }
+ }
vstream = viewer_stream_get_by_id(stream->stream_handle);
if (!vstream) {
vstream = viewer_stream_create(stream, seek_t);
/* Update number of created stream counter. */
(*nb_created)++;
}
+ /*
+ * Ensure a self-reference is preserved even
+ * after we have put our local reference.
+ */
+ viewer_stream_get(vstream);
} else {
if (!vstream->sent_flag && nb_unsent) {
/* Update number of unsent stream counter. */
(*nb_unsent)++;
}
- viewer_stream_put(vstream);
}
/* Update number of total stream counter. */
if (nb_total) {
- (*nb_total)++;
+ if (stream->is_metadata) {
+ if (!stream->closed ||
+ stream->metadata_received > vstream->metadata_sent) {
+ (*nb_total)++;
+ }
+ } else {
+ if (!stream->closed ||
+ !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+
+ (*nb_total)++;
+ }
+ }
}
+ /* Put local reference. */
+ viewer_stream_put(vstream);
next:
stream_put(stream);
}
struct lttng_viewer_new_streams_response response;
struct relay_session *session;
uint64_t session_id;
+ bool closed = false;
assert(conn);
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
- &nb_created);
+ &nb_created, &closed);
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
/*
- * If the session is closed and we have no new streams to send,
- * it means that the viewer has already received the whole trace
- * for this session and should now close it.
+ * If the session is closed, HUP when there are no more streams
+ * with data.
*/
- if (nb_total == 0 && session->connection_closed) {
+ if (closed && nb_total == 0) {
send_streams = 0;
+ response.streams_count = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
goto send_reply;
}
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct relay_session *session = NULL;
+ bool closed = false;
assert(conn);
goto send_reply;
}
- ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
+ ret = make_viewer_streams(session, seek_type, &nb_streams, NULL,
+ NULL, &closed);
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
+ /*
+ * If the session is closed when the viewer is attaching, it
+ * means some of the streams may have been concurrently removed,
+ * so we don't allow the viewer to attach, even if there are
+ * streams available.
+ */
+ if (closed) {
+ send_streams = 0;
+ response.streams_count = 0;
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+ goto send_reply;
+ }
+
send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));
vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
if (!vstream) {
- ret = -1;
- goto end;
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
}
/* Use back. ref. Protected by refcounts. */
viewer_index.stream_id = packet_index.stream_id;
send_reply:
- pthread_mutex_unlock(&rstream->lock);
+ if (rstream) {
+ pthread_mutex_unlock(&rstream->lock);
+ }
if (metadata_viewer_stream) {
pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
struct lttng_viewer_get_packet get_packet_info;
struct lttng_viewer_trace_packet reply;
struct relay_viewer_stream *vstream = NULL;
- struct ctf_trace *ctf_trace;
- struct relay_viewer_stream *metadata_viewer_stream = NULL;
DBG2("Relay get data packet");
vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
if (!vstream) {
- goto error;
- }
-
- ctf_trace = vstream->stream->trace;
-
- /* metadata_viewer_stream may be NULL. */
- metadata_viewer_stream =
- ctf_trace_get_viewer_metadata_stream(ctf_trace);
-
- if (metadata_viewer_stream) {
- bool get_packet_err = false;
-
- pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
- DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64,
- metadata_viewer_stream->stream->metadata_received,
- metadata_viewer_stream->metadata_sent);
- if (!metadata_viewer_stream->stream->metadata_received ||
- metadata_viewer_stream->stream->metadata_received >
- metadata_viewer_stream->metadata_sent) {
- /*
- * We prevent the client from reading a data stream as
- * long as there is metadata left to consume. This
- * ensures that the client won't receive data of which
- * it can't make sense.
- */
- get_packet_err = true;
- }
- pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
- viewer_stream_put(metadata_viewer_stream);
- if (get_packet_err) {
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
- reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
- goto send_reply_nolock;
- }
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ goto send_reply_nolock;
}
pthread_mutex_lock(&vstream->stream->lock);
- /*
- * The vstream->stream_fd used here has been opened by
- * get_next_index. It is opened there because this is what
- * allows us to grab a reference to the file with stream lock
- * held, thus protecting us against overwrite caused by
- * tracefile rotation. Since tracefile rotation unlinks the old
- * data file, we are ensured that we won't have our data
- * overwritten under us.
- */
- ret = check_new_streams(conn);
- if (ret < 0) {
- goto end_free;
- } else if (ret == 1) {
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
- reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
- goto send_reply;
- }
len = be32toh(get_packet_info.len);
data = zmalloc(len);
vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
if (!vstream) {
- reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+ /*
+ * The metadata stream can be closed by a CLOSE command
+ * just before we attach. It can also be closed by
+ * per-pid tracing during tracing. Therefore, it is
+ * possible that we cannot find this viewer stream.
+ * Reply back to the client with an error if we cannot
+ * find it.
+ */
+ reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
goto send_reply;
}
pthread_mutex_lock(&vstream->stream->lock);