+static
+int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_rotate_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+ int ret;
+ uint64_t chunk_id;
+ uint32_t rotate_pending;
+
+ DBG("Rotate pending command received");
+ fprintf(stderr, "Rotate pending command received\n");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (session->minor < 11) {
+ ERR("Unsupported feature before 2.11");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid rotate_pending struct size : %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ chunk_id = be64toh(msg.chunk_id);
+
+ rotate_pending = 0;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session != session) {
+ stream_put(stream);
+ continue;
+ }
+ pthread_mutex_lock(&stream->lock);
+ if (stream->rotate_at_seq_num != -1ULL) {
+ rotate_pending = 1;
+ DBG("Stream %" PRIu64 " is still rotating",
+ stream->stream_handle);
+ } else if (stream->chunk_id < chunk_id) {
+ rotate_pending = 1;
+ DBG("Stream %" PRIu64 " did not exist on the consumer "
+ "when the last rotation started, but is"
+ "still waiting for data before getting"
+ "closed",
+ stream->stream_handle);
+ }
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ if (rotate_pending) {
+ goto send_reply;
+ }
+ }
+ rcu_read_unlock();
+
+send_reply:
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(rotate_pending);
+ ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay rotate pending ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+