X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=63d0d65ee3157b05ef1113336185535ea23a9dbe;hp=831592a1e96ef4a8a22b64fdc29ffeaa75f62786;hb=173af62f4804133d4a7f45e34b6f72126f3eca5f;hpb=f64161251bd649abe5b6a473531adfa3af9bd6b6 diff --git a/src/common/consumer.c b/src/common/consumer.c index 831592a1e..63d0d65ee 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -177,11 +177,18 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) int ret; struct lttng_ht_iter iter; + if (relayd == NULL) { + return; + } + DBG("Consumer destroy and close relayd socket pair"); iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); - assert(!ret); + if (ret != 0) { + /* We assume the relayd was already destroyed */ + return; + } /* Close all sockets */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -266,8 +273,18 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) if (relayd != NULL) { uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); - if (uatomic_read(&relayd->refcount) == 0) { - /* Refcount of the relayd struct is 0, destroy it */ + + ret = relayd_send_close_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->next_net_seq_num - 1); + if (ret < 0) { + ERR("Unable to close stream on the relayd. Continuing"); + /* Continue here. There is nothing we can do for the relayd.*/ + } + + /* Both conditions are met, we destroy the relayd. */ + if (uatomic_read(&relayd->refcount) == 0 && + uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } } @@ -469,6 +486,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( obj->net_seq_idx = net_seq_idx; obj->refcount = 0; + obj->destroy_flag = 0; lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); @@ -546,6 +564,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); + data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,