uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
+ struct relay_stream_chunk_id stream_chunk_id = { 0 };
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
goto end_no_session;
}
- switch (session->minor) {
- case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+ if (session->minor == 1) {
+ /* For 2.1 */
ret = cmd_recv_stream_2_1(payload, &path_name,
&channel_name);
- break;
- case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
- default:
+ } else if (session->minor > 1 && session->minor < 11) {
+ /* From 2.2 to 2.10 */
ret = cmd_recv_stream_2_2(payload, &path_name,
&channel_name, &tracefile_size, &tracefile_count);
- break;
+ } else {
+ /* From 2.11 to ... */
+ ret = cmd_recv_stream_2_11(payload, &path_name,
+ &channel_name, &tracefile_size, &tracefile_count,
+ &stream_chunk_id.value);
+ stream_chunk_id.is_set = true;
}
+
if (ret < 0) {
goto send_reply;
}
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
- channel_name, tracefile_size, tracefile_count);
+ channel_name, tracefile_size, tracefile_count,
+ &stream_chunk_id);
path_name = NULL;
channel_name = NULL;
goto end_stream_unlock;
}
- stream->chunk_id = stream_info.new_chunk_id;
+ assert(stream->current_chunk_id.is_set);
+ stream->current_chunk_id.value = stream_info.new_chunk_id;
if (stream->is_metadata) {
/*
chunk_id = be64toh(msg.chunk_id);
- DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+ DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64,
+ session->session_name, chunk_id);
/*
* Iterate over all the streams in the session and check if they are
rotate_pending = true;
DBG("Stream %" PRIu64 " is still rotating",
stream->stream_handle);
- } else if (stream->chunk_id < chunk_id) {
+ } else if (stream->current_chunk_id.value < chunk_id) {
/*
* Stream closed on the consumer but still active on the
* relay.