* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
#include <common/utils.h>
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
- bool has_ref = false;
-
- pthread_mutex_lock(&stream->reflock);
- if (stream->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&stream->ref);
- }
- pthread_mutex_unlock(&stream->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&stream->ref);
}
/*
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ const struct relay_stream_chunk_id *chunk_id)
{
int ret;
struct relay_stream *stream = NULL;
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
- ret = -1;
goto error_no_alloc;
}
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
stream->channel_name = channel_name;
+ stream->rotate_at_seq_num = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
- pthread_mutex_init(&stream->reflock, NULL);
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
+ stream->current_chunk_id = *chunk_id;
stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!stream->indexes_ht) {
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) {
stream->is_metadata = 1;
}
/*
* Stream must be protected by holding the stream lock or by virtue of being
- * called from stream_destroy, in which case it is guaranteed to be accessed
- * from a single thread by the reflock.
+ * called from stream_destroy.
*/
static void stream_unpublish(struct relay_stream *stream)
{
static void stream_destroy(struct relay_stream *stream)
{
if (stream->indexes_ht) {
+ /*
+ * Calling lttng_ht_destroy in call_rcu worker thread so
+ * we don't hold the RCU read-side lock while calling
+ * it.
+ */
lttng_ht_destroy(stream->indexes_ht);
}
if (stream->tfa) {
/*
* No need to take stream->lock since this is only called on the final
* stream_put which ensures that a single thread may act on the stream.
- *
- * At that point, the object is also protected by the reflock which
- * guarantees that no other thread may share ownership of this stream.
*/
static void stream_release(struct urcu_ref *ref)
{
stream_fd_put(stream->stream_fd);
stream->stream_fd = NULL;
}
- if (stream->index_fd) {
- stream_fd_put(stream->index_fd);
- stream->index_fd = NULL;
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
}
if (stream->trace) {
ctf_trace_put(stream->trace);
void stream_put(struct relay_stream *stream)
{
DBG("stream put for stream id %" PRIu64, stream->stream_handle);
- /*
- * Ensure existence of stream->reflock for stream unlock.
- */
rcu_read_lock();
- /*
- * Stream reflock ensures that concurrent test and update of
- * stream ref is atomic.
- */
- pthread_mutex_lock(&stream->reflock);
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
stream->stream_handle,
(int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
- pthread_mutex_unlock(&stream->reflock);
rcu_read_unlock();
}
void try_stream_close(struct relay_stream *stream)
{
+ bool session_aborted;
+ struct relay_session *session = stream->trace->session;
+
DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->lock);
+ session_aborted = session->aborted;
+ pthread_mutex_unlock(&session->lock);
+
pthread_mutex_lock(&stream->lock);
/*
* Can be called concurently by connection close and reception of last
}
stream->close_requested = true;
- /*
- * We shortcut the data pending check if no bound is known for this
- * stream. This prevents us from never closing the stream in the case
- * where a connection would be closed before a "close" command has
- * been received.
- *
- * TODO
- * This still leaves open the question of handling missing data after
- * a bound has been set by a stream close command. Since we have no
- * way of pairing data and control connection, and that a data
- * connection has no ownership of a stream, it is likely that a
- * timeout approach would be appropriate to handle dangling streams.
- */
+
+ if (stream->last_net_seq_num == -1ULL) {
+ /*
+ * Handle connection close without explicit stream close
+ * command.
+ *
+ * We can be clever about indexes partially received in
+ * cases where we received the data socket part, but not
+ * the control socket part: since we're currently closing
+ * the stream on behalf of the control socket, we *know*
+ * there won't be any more control information for this
+ * socket. Therefore, we can destroy all indexes for
+ * which we have received only the file descriptor (from
+ * data socket). This takes care of consumerd crashes
+ * between sending the data and control information for
+ * a packet. Since those are sent in that order, we take
+ * care of consumerd crashes.
+ */
+ DBG("relay_index_close_partial_fd");
+ relay_index_close_partial_fd(stream);
+ /*
+ * Use the highest net_seq_num we currently have pending
+ * As end of stream indicator. Leave last_net_seq_num
+ * at -1ULL if we cannot find any index.
+ */
+ stream->last_net_seq_num = relay_index_find_last(stream);
+ DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
+ /* Fall-through into the next check. */
+ }
+
if (stream->last_net_seq_num != -1ULL &&
- ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
- /* Don't close since we still have data pending. */
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+ && !session_aborted) {
+ /*
+ * Don't close since we still have data pending. This
+ * handles cases where an explicit close command has
+ * been received for this stream, and cases where the
+ * connection has been closed, and we are awaiting for
+ * index information from the data socket. It is
+ * therefore expected that all the index fd information
+ * we need has already been received on the control
+ * socket. Matching index information from data socket
+ * should be Expected Soon(TM).
+ *
+ * TODO: We should implement a timer to garbage collect
+ * streams after a timeout to be resilient against a
+ * consumerd implementation that would not match this
+ * expected behavior.
+ */
pthread_mutex_unlock(&stream->lock);
DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
return;
}
+ /*
+ * We received all the indexes we can expect.
+ */
stream_unpublish(stream);
stream->closed = true;
/* Relay indexes are only used by the "consumer/sessiond" end. */
stream_put(stream);
}
+static void print_stream_indexes(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
+ index_n.node) {
+ DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
+ " stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ index,
+ index->index_n.key,
+ stream->ref.refcount,
+ index->stream->stream_handle,
+ index->stream->trace->id,
+ index->stream->trace->session->id);
+ }
+ rcu_read_unlock();
+}
+
void print_relay_streams(void)
{
struct lttng_ht_iter iter;
struct relay_stream *stream;
+ if (!relay_streams_ht) {
+ return;
+ }
+
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
stream->stream_handle,
stream->trace->id,
stream->trace->session->id);
+ print_stream_indexes(stream);
stream_put(stream);
}
rcu_read_unlock();