ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- DBG("Unable to close stream on the relayd. Continuing");
- /*
- * Continue here. There is nothing we can do for the relayd.
- * Chances are that the relayd has closed the socket so we just
- * continue cleaning up.
- */
+ ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->id);
+ lttng_consumer_cleanup_relayd(relayd);
}
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
consumer_destroy_relayd(relayd);
}
- stream->net_seq_idx = (uint64_t) -1ULL;
+ stream->relayd_id = (uint64_t) -1ULL;
stream->sent_to_relayd = 0;
}
stream->out_fd = -1;
}
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret) {
- PERROR("close stream index_fd");
- }
- stream->index_fd = -1;
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
}
/* Check and cleanup relayd if needed. */
rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
+ relayd = consumer_find_relayd(stream->relayd_id);
if (relayd != NULL) {
consumer_stream_relayd_close(stream, relayd);
}
* Return 0 on success or else a negative value.
*/
int consumer_stream_write_index(struct lttng_consumer_stream *stream,
- struct ctf_packet_index *index)
+ struct ctf_packet_index *element)
{
int ret;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
- assert(index);
+ assert(element);
rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_send_index(&relayd->control_sock, index,
+ if (stream->relayd_id != (uint64_t) -1ULL) {
+ struct consumer_relayd_sock_pair *relayd;
+ relayd = consumer_find_relayd(stream->relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_send_index(&relayd->control_sock, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /*
+ * Communication error with lttng-relayd,
+ * perform cleanup now
+ */
+ ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->id);
+ lttng_consumer_cleanup_relayd(relayd);
+ ret = -1;
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
+ stream->key, stream->relayd_id);
+ ret = -1;
+ }
} else {
- ssize_t size_ret;
-
- size_ret = index_write(stream->index_fd, index,
- sizeof(struct ctf_packet_index));
- if (size_ret < sizeof(struct ctf_packet_index)) {
+ if (lttng_index_file_write(stream->index_file, element)) {
ret = -1;
} else {
ret = 0;