X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=7d6dc1bc20a03a5af6cafe1e5ef170be1b7f5cb8;hp=d5f04582419dffec1466c60c4de449cf34948a03;hb=9edaf114d28249f4740de16bc9f58c43cfe8042e;hpb=9a9c8637bbfb1b12b8302e03d5bc0453672b4d06 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index d5f045824..7d6dc1bc2 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -268,8 +268,8 @@ end_unlock: * * Return 0 on success or else a negative value. */ -static int make_viewer_streams(struct relay_session *session, - struct lttng_trace_chunk *viewer_trace_chunk, +static int make_viewer_streams(struct relay_session *relay_session, + struct relay_viewer_session *viewer_session, enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent, @@ -279,18 +279,19 @@ static int make_viewer_streams(struct relay_session *session, int ret; struct lttng_ht_iter iter; struct ctf_trace *ctf_trace; + struct relay_stream *relay_stream = NULL; - assert(session); - ASSERT_LOCKED(session->lock); + assert(relay_session); + ASSERT_LOCKED(relay_session->lock); - if (!viewer_trace_chunk) { + if (!viewer_session->current_trace_chunk) { ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk", - session->session_name); + relay_session->session_name); ret = -1; goto error; } - if (session->connection_closed) { + if (relay_session->connection_closed) { *closed = true; } @@ -299,10 +300,9 @@ static int make_viewer_streams(struct relay_session *session, * used for a the given session id only. */ rcu_read_lock(); - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, - node.node) { + cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter, + ctf_trace, node.node) { bool trace_has_metadata_stream = false; - struct relay_stream *stream; health_code_update(); @@ -314,15 +314,23 @@ static int make_viewer_streams(struct relay_session *session, * Iterate over all the streams of the trace to see if we have a * metadata stream. */ - cds_list_for_each_entry_rcu( - stream, &ctf_trace->stream_list, stream_node) + cds_list_for_each_entry_rcu(relay_stream, + &ctf_trace->stream_list, stream_node) { - if (stream->is_metadata) { + bool is_metadata_stream; + + pthread_mutex_lock(&relay_stream->lock); + is_metadata_stream = relay_stream->is_metadata; + pthread_mutex_unlock(&relay_stream->lock); + + if (is_metadata_stream) { trace_has_metadata_stream = true; break; } } + relay_stream = NULL; + /* * If there is no metadata stream in this trace at the moment * and we never sent one to the viewer, skip the trace. We @@ -334,35 +342,72 @@ static int make_viewer_streams(struct relay_session *session, continue; } - cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) { - struct relay_viewer_stream *vstream; + cds_list_for_each_entry_rcu(relay_stream, + &ctf_trace->stream_list, stream_node) + { + struct relay_viewer_stream *viewer_stream; - if (!stream_get(stream)) { + if (!stream_get(relay_stream)) { continue; } + + pthread_mutex_lock(&relay_stream->lock); /* * stream published is protected by the session lock. */ - if (!stream->published) { + if (!relay_stream->published) { goto next; } - vstream = viewer_stream_get_by_id(stream->stream_handle); - if (!vstream) { + viewer_stream = viewer_stream_get_by_id( + relay_stream->stream_handle); + if (!viewer_stream) { + struct lttng_trace_chunk *viewer_stream_trace_chunk; + /* * Save that we sent the metadata stream to the * viewer. So that we know what trace the viewer * is aware of. */ - if (stream->is_metadata) { - ctf_trace->metadata_stream_sent_to_viewer = - true; + if (relay_stream->is_metadata) { + ctf_trace->metadata_stream_sent_to_viewer = true; } - vstream = viewer_stream_create(stream, - viewer_trace_chunk, seek_t); - if (!vstream) { + + /* + * If a rotation is ongoing, use a copy of the + * relay stream's chunk to ensure the stream + * files exist. + * + * Otherwise, the viewer session's current trace + * chunk can be used safely. + */ + if ((relay_stream->ongoing_rotation.is_set || + relay_session->ongoing_rotation) && + relay_stream->trace_chunk) { + viewer_stream_trace_chunk = lttng_trace_chunk_copy( + relay_stream->trace_chunk); + if (!viewer_stream_trace_chunk) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } + } else { + const bool reference_acquired = lttng_trace_chunk_get( + viewer_session->current_trace_chunk); + + assert(reference_acquired); + viewer_stream_trace_chunk = + viewer_session->current_trace_chunk; + } + + viewer_stream = viewer_stream_create( + relay_stream, + viewer_stream_trace_chunk, + seek_t); + lttng_trace_chunk_put(viewer_stream_trace_chunk); + viewer_stream_trace_chunk = NULL; + if (!viewer_stream) { ret = -1; ctf_trace_put(ctf_trace); - stream_put(stream); goto error_unlock; } @@ -374,36 +419,40 @@ static int make_viewer_streams(struct relay_session *session, * Ensure a self-reference is preserved even * after we have put our local reference. */ - if (!viewer_stream_get(vstream)) { + if (!viewer_stream_get(viewer_stream)) { ERR("Unable to get self-reference on viewer stream, logic error."); abort(); } } else { - if (!vstream->sent_flag && nb_unsent) { + if (!viewer_stream->sent_flag && nb_unsent) { /* Update number of unsent stream counter. */ (*nb_unsent)++; } } /* Update number of total stream counter. */ if (nb_total) { - if (stream->is_metadata) { - if (!stream->closed || - stream->metadata_received > vstream->metadata_sent) { + if (relay_stream->is_metadata) { + if (!relay_stream->closed || + relay_stream->metadata_received > + viewer_stream->metadata_sent) { (*nb_total)++; } } else { - if (!stream->closed || - !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) { - + if (!relay_stream->closed || + !(((int64_t)(relay_stream->prev_data_seq - + relay_stream->last_net_seq_num)) >= + 0)) { (*nb_total)++; } } } /* Put local reference. */ - viewer_stream_put(vstream); + viewer_stream_put(viewer_stream); next: - stream_put(stream); + pthread_mutex_unlock(&relay_stream->lock); + stream_put(relay_stream); } + relay_stream = NULL; ctf_trace_put(ctf_trace); } @@ -412,6 +461,11 @@ static int make_viewer_streams(struct relay_session *session, error_unlock: rcu_read_unlock(); error: + if (relay_stream) { + pthread_mutex_unlock(&relay_stream->lock); + stream_put(relay_stream); + } + return ret; } @@ -1088,7 +1142,7 @@ int viewer_get_new_streams(struct relay_connection *conn) pthread_mutex_lock(&session->lock); ret = make_viewer_streams(session, - conn->viewer_session->current_trace_chunk, + conn->viewer_session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { @@ -1237,7 +1291,7 @@ int viewer_attach_session(struct relay_connection *conn) } ret = make_viewer_streams(session, - conn->viewer_session->current_trace_chunk, seek_type, + conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); if (ret < 0) { goto end_put_session;