-/*
- * Create and add connection to the given hash table.
- *
- * Return poll add value or else -1 on error.
- */
-static
-int add_connection(int fd, struct lttng_poll_event *events,
- struct lttng_ht *relay_connections_ht)
-{
- int ret;
- struct relay_command *relay_connection;
-
- assert(events);
- assert(relay_connections_ht);
-
- relay_connection = zmalloc(sizeof(struct relay_command));
- if (relay_connection == NULL) {
- PERROR("Relay command zmalloc");
- goto error;
- }
-
- do {
- ret = read(fd, relay_connection, sizeof(*relay_connection));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(*relay_connection)) {
- PERROR("read relay cmd pipe");
- goto error_read;
- }
-
- lttng_ht_node_init_ulong(&relay_connection->sock_n,
- (unsigned long) relay_connection->sock->fd);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relay_connections_ht,
- &relay_connection->sock_n);
- rcu_read_unlock();
-
- return lttng_poll_add(events, relay_connection->sock->fd,
- LPOLLIN | LPOLLRDHUP);
-
-error_read:
- free(relay_connection);
-error:
- return -1;
-}
-
-static
-void deferred_free_connection(struct rcu_head *head)
-{
- struct relay_command *relay_connection =
- caa_container_of(head, struct relay_command, rcu_node);
-
- if (relay_connection->session &&
- relay_connection->session->viewer_attached > 0) {
- relay_connection->session->viewer_attached--;
- }
- lttcomm_destroy_sock(relay_connection->sock);
- free(relay_connection);
-}
-
-static
-void deferred_free_viewer_stream(struct rcu_head *head)
-{
- struct relay_viewer_stream *stream =
- caa_container_of(head, struct relay_viewer_stream, rcu_node);
-
- if (stream->ctf_trace) {
- uatomic_dec(&stream->ctf_trace->refcount);
- assert(uatomic_read(&stream->ctf_trace->refcount) >= 0);
- if (uatomic_read(&stream->ctf_trace->refcount) == 0) {
- DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id);
- free(stream->ctf_trace);
- }
- }
-
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
-}
-
-static
-void viewer_del_streams(struct lttng_ht *viewer_streams_ht,
- struct relay_session *session)
-{
- int ret;
- struct relay_viewer_stream *stream;
- struct lttng_ht_node_u64 *node;
- struct lttng_ht_iter iter;
-
- assert(viewer_streams_ht);
- assert(session);
-
- rcu_read_lock();
- cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
- node = lttng_ht_iter_get_node_u64(&iter);
- if (!node) {
- continue;
- }
-
- stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
- if (stream->session_id != session->id) {
- continue;
- }
-
- if (stream->read_fd > 0) {
- ret = close(stream->read_fd);
- if (ret < 0) {
- PERROR("close read_fd");
- }
- }
- if (stream->index_read_fd > 0) {
- ret = close(stream->index_read_fd);
- if (ret < 0) {
- PERROR("close index_read_fd");
- }
- }
- if (stream->metadata_flag && stream->ctf_trace) {
- stream->ctf_trace->metadata_sent = 0;
- }
- ret = lttng_ht_del(viewer_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
- }
- rcu_read_unlock();
-}
-
-/*
- * Delete and free a connection.
- *
- * RCU read side lock MUST be acquired.
- */
-static
-void del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht_iter *iter, struct relay_command *relay_connection,
- struct lttng_ht *viewer_streams_ht)
-{
- int ret;
-
- assert(relay_connections_ht);
- assert(iter);
- assert(relay_connection);
- assert(viewer_streams_ht);
-
- ret = lttng_ht_del(relay_connections_ht, iter);
- assert(!ret);
-
- if (relay_connection->session) {
- viewer_del_streams(viewer_streams_ht, relay_connection->session);
- }
-
- call_rcu(&relay_connection->rcu_node, deferred_free_connection);
-}
-