ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
stream->chan->tracefile_size, stream->chan->tracefile_count);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto end;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
uatomic_inc(&relayd->refcount);
stream->sent_to_relayd = 1;
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_streams_sent(&relayd->control_sock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto end;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
net_seq_idx);
return ret;
}
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
-
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret) {
- /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
- ret = 0;
- goto end;
- }
-
- ret = 1;
-
-end:
- return ret;
-}
-
/*
* Search for a relayd associated to the session id and return the reference.
*
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- relayd = find_relayd_by_session_id(id);
- if (relayd) {
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
- if (ret < 0) {
- /* Communication error thus the relayd so no data pending. */
- ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- goto data_not_pending;
- }
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
- /* If this call fails, the stream is being used hence data pending. */
- ret = stream_try_lock(stream);
- if (!ret) {
- goto data_pending;
- }
+ pthread_mutex_lock(&stream->lock);
/*
* A removed node from the hash table indicates that the stream has
}
}
- /* Relayd check */
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto data_not_pending;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
+ if (ret == 1) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ goto data_pending;
+ }
if (ret < 0) {
ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
pthread_mutex_unlock(&stream->lock);
goto data_not_pending;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
- goto data_pending;
- }
}
- pthread_mutex_unlock(&stream->lock);
- }
-
- if (relayd) {
- unsigned int is_data_inflight = 0;
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Send end command for data pending. */
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_not_pending;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (is_data_inflight) {
goto data_pending;
}