#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/kernel-ctl/kernel-ctl.h>
uatomic_dec(&relayd->refcount);
assert(uatomic_read(&relayd->refcount) >= 0);
+ /* Closing streams requires to lock the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
DBG("Unable to close stream on the relayd. Continuing");
/*
}
/*
- * Add relayd socket to global consumer data hashtable.
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
*/
+
int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
goto end;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.relayd_ht,
(void *)((unsigned long) relayd->net_seq_idx), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
- rcu_read_unlock();
/* Relayd already exist. Ignore the insertion */
goto end;
}
lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
- rcu_read_unlock();
-
end:
return ret;
}
PERROR("write metadata stream id");
goto end;
}
- DBG("Metadata stream id %zu written before data",
+ DBG("Metadata stream id %" PRIu64 " written before data",
stream->relayd_stream_id);
end:
}
goto end;
} else if (ret > len) {
- PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+ PERROR("Error in file write (ret %zd > len %lu)", ret, len);
written += ret;
goto end;
} else {
len -= ret;
mmap_offset += ret;
}
- DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {