From c7e794b7ea1e10f25e64ba29f740890f3aa93b0a Mon Sep 17 00:00:00 2001 From: Richard Thier Date: Tue, 16 Aug 2022 19:24:27 +0200 Subject: [PATCH] trying more ILP in spsort - not much success and will be reverted --- space_partitioning_sort/spsort.h | 38 +++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/space_partitioning_sort/spsort.h b/space_partitioning_sort/spsort.h index c406526..313bbd0 100644 --- a/space_partitioning_sort/spsort.h +++ b/space_partitioning_sort/spsort.h @@ -133,11 +133,19 @@ inline uint32_t internal_mid(uint32_t low, uint32_t high) { inline void spsort(uint32_t *t, int n, int m = 32); /** Helper function that puts elements higher then mid to the top of the array and lower to the bottom. Returns number of bottoms */ -inline int internal_array_separate(uint32_t *t, int n, uint32_t mid) { +inline int internal_array_separate(uint32_t *t, int n, uint32_t mid, int bulk_xchg = 32) { if(n > 0) { // Two heads that also read & write (both) int left = 0; int right = n - 1; + + // These are needed for more ILP so that we can do the xchg operations in bulk + // and without data dependencies just do it in an unrolled loop from time to time! + std::vector xchg_left(0); + std::vector xchg_right(0); + xchg_left.reserve(bulk_xchg); + xchg_right.reserve(bulk_xchg); + while(left < right) { // Step over already good positioned values from left while((left < right) && (t[left] < mid)) { @@ -152,12 +160,30 @@ inline int internal_array_separate(uint32_t *t, int n, uint32_t mid) { // Extra check needed for edge-case! if(left < right) { // Both in wrong location - xchg them! - auto tmp = t[right]; - t[right] = t[left]; - t[left] = tmp; - ++left; - --right; + // instead of doing it right here, collect them up! (*) + xchg_left.push_back(left); + xchg_right.push_back(right); } + + // See if we can do some bulk-exchange now (*) + // This loop the compiler should more easily unroll + // and the CPU should be able to schedule ILP-wise! + if(xchg_left.size() <= bulk_xchg) { + for(int i = 0; i < xchg_left.size(); ++i) { + auto tmp = t[xchg_left[i]]; + t[xchg_left[i]] = t[xchg_right[i]]; + t[xchg_right[i]] = tmp; + } + xchg_left.resize(0); + xchg_right.resize(0); + } + } + + // Finish the remaining bulk exchanges (*) + for(int i = 0; i < xchg_left.size(); ++i) { + auto tmp = t[xchg_left[i]]; + t[xchg_left[i]] = t[xchg_right[i]]; + t[xchg_right[i]] = tmp; } // Edge-case increment if single elem happens in middle in the end