A reference to the local ctx for the socket pair is used to "force" an
evaluation of the data and metadata stream since we changed the endpoint
status. This mostly result in the closing of all the streams for which
the relayd socket pair is linked to.
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- DBG("Unable to close stream on the relayd. Continuing");
- /*
- * Continue here. There is nothing we can do for the relayd.
- * Chances are that the relayd has closed the socket so we just
- * continue cleaning up.
- */
+ ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_index(&relayd->control_sock, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_index(&relayd->control_sock, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
+ if (ret < 0) {
+ /*
+ * Communication error with lttng-relayd,
+ * perform cleanup now
+ */
+ ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ ret = -1;
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
* If a local data context is available, notify the threads that the streams'
* state have changed.
*/
* If a local data context is available, notify the threads that the streams'
* state have changed.
*/
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
- struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
{
uint64_t netidx;
assert(relayd);
{
uint64_t netidx;
assert(relayd);
- DBG("Cleaning up relayd sockets");
+ DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
/* Save the net sequence index before destroying the object */
netidx = relayd->net_seq_idx;
/* Save the net sequence index before destroying the object */
netidx = relayd->net_seq_idx;
* memory barrier ordering the updates of the end point status from the
* read of this status which happens AFTER receiving this notify.
*/
* memory barrier ordering the updates of the end point status from the
* read of this status which happens AFTER receiving this notify.
*/
- if (ctx) {
- notify_thread_lttng_pipe(ctx->consumer_data_pipe);
- notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
- }
+ notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
stream->chan->tracefile_size, stream->chan->tracefile_count);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
stream->chan->tracefile_size, stream->chan->tracefile_count);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
uatomic_inc(&relayd->refcount);
stream->sent_to_relayd = 1;
uatomic_inc(&relayd->refcount);
stream->sent_to_relayd = 1;
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_streams_sent(&relayd->control_sock);
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_streams_sent(&relayd->control_sock);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
net_seq_idx);
} else {
ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
net_seq_idx);
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
/* Skip splice error so the consumer does not fail */
goto end;
}
/* Skip splice error so the consumer does not fail */
goto end;
}
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
add_relayd(relayd);
/* All good! */
add_relayd(relayd);
/* All good! */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_begin_data_pending(&relayd->control_sock,
relayd->relayd_session_id);
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_begin_data_pending(&relayd->control_sock,
relayd->relayd_session_id);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
/* Communication error thus the relayd so no data pending. */
if (ret < 0) {
/* Communication error thus the relayd so no data pending. */
+ ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
cds_lfht_for_each_entry_duplicate(ht->ht,
}
cds_lfht_for_each_entry_duplicate(ht->ht,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
+ if (ret < 0) {
+ ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (is_data_inflight) {
goto data_pending;
}
if (is_data_inflight) {
goto data_pending;
}
/* Session id on both sides for the sockets. */
uint64_t relayd_session_id;
uint64_t sessiond_session_id;
/* Session id on both sides for the sockets. */
uint64_t relayd_session_id;
uint64_t sessiond_session_id;
+ struct lttng_consumer_local_data *ctx;
int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
#endif /* LIB_CONSUMER_H */
#endif /* LIB_CONSUMER_H */