Add consumer socket object and relayd commands
[lttng-tools.git] / src / common / consumer.c
index 831592a1e96ef4a8a22b64fdc29ffeaa75f62786..63d0d65ee3157b05ef1113336185535ea23a9dbe 100644 (file)
@@ -177,11 +177,18 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
        int ret;
        struct lttng_ht_iter iter;
 
+       if (relayd == NULL) {
+               return;
+       }
+
        DBG("Consumer destroy and close relayd socket pair");
 
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
-       assert(!ret);
+       if (ret != 0) {
+               /* We assume the relayd was already destroyed */
+               return;
+       }
 
        /* Close all sockets */
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -266,8 +273,18 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        if (relayd != NULL) {
                uatomic_dec(&relayd->refcount);
                assert(uatomic_read(&relayd->refcount) >= 0);
-               if (uatomic_read(&relayd->refcount) == 0) {
-                       /* Refcount of the relayd struct is 0, destroy it */
+
+               ret = relayd_send_close_stream(&relayd->control_sock,
+                               stream->relayd_stream_id,
+                               stream->next_net_seq_num - 1);
+               if (ret < 0) {
+                       ERR("Unable to close stream on the relayd. Continuing");
+                       /* Continue here. There is nothing we can do for the relayd.*/
+               }
+
+               /* Both conditions are met, we destroy the relayd. */
+               if (uatomic_read(&relayd->refcount) == 0 &&
+                               uatomic_read(&relayd->destroy_flag)) {
                        consumer_destroy_relayd(relayd);
                }
        }
@@ -469,6 +486,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
 
        obj->net_seq_idx = net_seq_idx;
        obj->refcount = 0;
+       obj->destroy_flag = 0;
        lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
        pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
 
@@ -546,6 +564,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                /* Set header with stream information */
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
+               data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
                /* Other fields are zeroed previously */
 
                ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
This page took 0.026512 seconds and 5 git commands to generate.