X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=cac87635ed46c2044f72c44e0e99940c6af1af17;hp=2a59d1ed3e166edd319209ea97065eebf81e3876;hb=65d66b193154da8d3a153e6ae6422d9c302e5fb1;hpb=bda7c7b9b4c633de16f3d8bf109f9d21fdd9a5fb diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 2a59d1ed3..cac87635e 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -354,26 +354,59 @@ void try_stream_close(struct relay_stream *stream) } stream->close_requested = true; - /* - * We shortcut the data pending check if no bound is known for this - * stream. This prevents us from never closing the stream in the case - * where a connection would be closed before a "close" command has - * been received. - * - * TODO - * This still leaves open the question of handling missing data after - * a bound has been set by a stream close command. Since we have no - * way of pairing data and control connection, and that a data - * connection has no ownership of a stream, it is likely that a - * timeout approach would be appropriate to handle dangling streams. - */ + + if (stream->last_net_seq_num == -1ULL) { + /* + * Handle connection close without explicit stream close + * command. + * + * We can be clever about indexes partially received in + * cases where we received the data socket part, but not + * the control socket part: since we're currently closing + * the stream on behalf of the control socket, we *know* + * there won't be any more control information for this + * socket. Therefore, we can destroy all indexes for + * which we have received only the file descriptor (from + * data socket). This takes care of consumerd crashes + * between sending the data and control information for + * a packet. Since those are sent in that order, we take + * care of consumerd crashes. + */ + relay_index_close_partial_fd(stream); + /* + * Use the highest net_seq_num we currently have pending + * As end of stream indicator. Leave last_net_seq_num + * at -1ULL if we cannot find any index. + */ + stream->last_net_seq_num = relay_index_find_last(stream); + /* Fall-through into the next check. */ + } + if (stream->last_net_seq_num != -1ULL && ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { - /* Don't close since we still have data pending. */ + /* + * Don't close since we still have data pending. This + * handles cases where an explicit close command has + * been received for this stream, and cases where the + * connection has been closed, and we are awaiting for + * index information from the data socket. It is + * therefore expected that all the index fd information + * we need has already been received on the control + * socket. Matching index information from data socket + * should be Expected Soon(TM). + * + * TODO: We should implement a timer to garbage collect + * streams after a timeout to be resilient against a + * consumerd implementation that would not match this + * expected behavior. + */ pthread_mutex_unlock(&stream->lock); DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); return; } + /* + * We received all the indexes we can expect. + */ stream_unpublish(stream); stream->closed = true; /* Relay indexes are only used by the "consumer/sessiond" end. */