X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=292bd393874a85c1bffdb3800f54398cb31fdd0d;hb=9a951630b590d14966e19b8d54f078fe174c1f47;hp=0539eeb7be17143a98d7741d5448552d7fe2e862;hpb=089623df79c3215cb00d8f35aeb82eea1914eca4;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 0539eeb7b..292bd3938 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -792,13 +792,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, stream->chan->tracefile_size, stream->chan->tracefile_count); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto end; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); uatomic_inc(&relayd->refcount); stream->sent_to_relayd = 1; @@ -836,13 +835,12 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_streams_sent(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto end; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx); @@ -3540,34 +3538,6 @@ error_nosignal: return ret; } -/* - * Try to lock the stream mutex. - * - * On success, 1 is returned else 0 indicating that the mutex is NOT lock. - */ -static int stream_try_lock(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret) { - /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ - ret = 0; - goto end; - } - - ret = 1; - -end: - return ret; -} - /* * Search for a relayd associated to the session id and return the reference. * @@ -3634,31 +3604,11 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - goto data_not_pending; - } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { - /* If this call fails, the stream is being used hence data pending. */ - ret = stream_try_lock(stream); - if (!ret) { - goto data_pending; - } + pthread_mutex_lock(&stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3676,9 +3626,29 @@ int consumer_data_pending(uint64_t id) } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + /* Communication error thus the relayd so no data pending. */ + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3687,6 +3657,11 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } + if (ret == 1) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + goto data_pending; + } if (ret < 0) { ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); @@ -3694,29 +3669,17 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&stream->lock); goto data_not_pending; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret == 1) { - pthread_mutex_unlock(&stream->lock); - goto data_pending; - } } - pthread_mutex_unlock(&stream->lock); - } - - if (relayd) { - unsigned int is_data_inflight = 0; - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_not_pending; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (is_data_inflight) { goto data_pending; }