diff --git a/threepass_xbit.h b/threepass_xbit.h index 4298664..840e779 100644 --- a/threepass_xbit.h +++ b/threepass_xbit.h @@ -20,6 +20,13 @@ static inline constexpr uint32_t min3u32_xb(uint32_t a, uint32_t b, uint32_t c) ((b <= c) ? b : c); } +static inline void tpxb_process_element(uint32_t num, uint32_t* arr, uint32_t* bucket, + uint32_t shr, uint32_t mask) { + auto bkeyni = (num >> shr) & mask; + auto offset = --bucket[bkeyni]; + arr[offset] = num; +} + /** * Simple three-pass (ok: 3 + 1) bottom-up internal radix sort writter for thiersort3 * @@ -91,44 +98,177 @@ static inline void threepass_xb(uint32_t *a, uint32_t *buf, int n) noexcept { // Bottom digit a->buf // right-to-left to ensure already sorted digits order we keep for iterations - #pragma GCC unroll 48 - for(uint32_t i = n; i > 0; --i) { - // Prefetch caches - //__builtin_prefetch(&a[i-8]); - // Get num and its new offset / location - auto num = a[i - 1]; + #pragma GCC unroll 3 + for(i = n; i >= 16; i -= 16) { + // Prefetch the NEXT block (not current) at optimal distance + if (i > 17) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&a[i - 17]); + } + if (i > 17*2) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&a[i - 17*2]); + } + if (i > 17*3) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&a[i - 17*3]); + } + + // Process 16 elements in reverse order + auto num15 = a[i - 1]; + auto num14 = a[i - 2]; + auto num13 = a[i - 3]; + auto num12 = a[i - 4]; + auto num11 = a[i - 5]; + auto num10 = a[i - 6]; + auto num9 = a[i - 7]; + auto num8 = a[i - 8]; + auto num7 = a[i - 9]; + auto num6 = a[i - 10]; + auto num5 = a[i - 11]; + auto num4 = a[i - 12]; + auto num3 = a[i - 13]; + auto num2 = a[i - 14]; + auto num1 = a[i - 15]; + auto num0 = a[i - 16]; + + // Process all 16 elements (your bucket logic here) + tpxb_process_element(num15, buf, bucket3, shr3, mask3); + tpxb_process_element(num14, buf, bucket3, shr3, mask3); + tpxb_process_element(num13, buf, bucket3, shr3, mask3); + tpxb_process_element(num12, buf, bucket3, shr3, mask3); + tpxb_process_element(num11, buf, bucket3, shr3, mask3); + tpxb_process_element(num10, buf, bucket3, shr3, mask3); + tpxb_process_element(num9, buf, bucket3, shr3, mask3); + tpxb_process_element(num8, buf, bucket3, shr3, mask3); + tpxb_process_element(num7, buf, bucket3, shr3, mask3); + tpxb_process_element(num6, buf, bucket3, shr3, mask3); + tpxb_process_element(num5, buf, bucket3, shr3, mask3); + tpxb_process_element(num4, buf, bucket3, shr3, mask3); + tpxb_process_element(num3, buf, bucket3, shr3, mask3); + tpxb_process_element(num2, buf, bucket3, shr3, mask3); + tpxb_process_element(num1, buf, bucket3, shr3, mask3); + tpxb_process_element(num0, buf, bucket3, shr3, mask3); + } + // Handle remainder (less than 16 elements) + for(uint32_t j = i; j > 0; --j) { + auto num = a[j - 1]; auto bkeyni = (num >> shr3) & mask3; auto offset = --bucket3[bkeyni]; - - // Add to the proper target location buf[offset] = num; } + // Mid digit buf->a // right-to-left to ensure already sorted digits order we keep for iterations - #pragma GCC unroll 48 - for(uint32_t i = n; i > 0; --i) { - // Prefetch caches - //__builtin_prefetch(&buf[i-8]); - // Get num and its new offset / location - auto num = buf[i - 1]; + #pragma GCC unroll 3 + for(i = n; i >= 16; i -= 16) { + // Prefetch the NEXT block (not current) at optimal distance + if (i > 17) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&buf[i - 17]); + } + if (i > 17*2) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&buf[i - 17*2]); + } + if (i > 17*3) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&buf[i - 17*3]); + } + + // Process 16 elements in reverse order + auto num15 = buf[i - 1]; + auto num14 = buf[i - 2]; + auto num13 = buf[i - 3]; + auto num12 = buf[i - 4]; + auto num11 = buf[i - 5]; + auto num10 = buf[i - 6]; + auto num9 = buf[i - 7]; + auto num8 = buf[i - 8]; + auto num7 = buf[i - 9]; + auto num6 = buf[i - 10]; + auto num5 = buf[i - 11]; + auto num4 = buf[i - 12]; + auto num3 = buf[i - 13]; + auto num2 = buf[i - 14]; + auto num1 = buf[i - 15]; + auto num0 = buf[i - 16]; + + // Process all 16 elements (your bucket logic here) + tpxb_process_element(num15, a, bucket2, shr2, mask2); + tpxb_process_element(num14, a, bucket2, shr2, mask2); + tpxb_process_element(num13, a, bucket2, shr2, mask2); + tpxb_process_element(num12, a, bucket2, shr2, mask2); + tpxb_process_element(num11, a, bucket2, shr2, mask2); + tpxb_process_element(num10, a, bucket2, shr2, mask2); + tpxb_process_element(num9, a, bucket2, shr2, mask2); + tpxb_process_element(num8, a, bucket2, shr2, mask2); + tpxb_process_element(num7, a, bucket2, shr2, mask2); + tpxb_process_element(num6, a, bucket2, shr2, mask2); + tpxb_process_element(num5, a, bucket2, shr2, mask2); + tpxb_process_element(num4, a, bucket2, shr2, mask2); + tpxb_process_element(num3, a, bucket2, shr2, mask2); + tpxb_process_element(num2, a, bucket2, shr2, mask2); + tpxb_process_element(num1, a, bucket2, shr2, mask2); + tpxb_process_element(num0, a, bucket2, shr2, mask2); + } + // Handle remainder (less than 16 elements) + for(uint32_t j = i; j > 0; --j) { + auto num = buf[j - 1]; auto bkeyni = (num >> shr2) & mask2; auto offset = --bucket2[bkeyni]; - - // Add to the proper target location a[offset] = num; } // Top digit a->buf // right-to-left to ensure already sorted digits order we keep for iterations - #pragma GCC unroll 48 - for(uint32_t i = n; i > 0; --i) { - // Prefetch caches - // __builtin_prefetch(&a[i-16]); - // Get num and its new offset / location - auto num = a[i - 1]; + #pragma GCC unroll 3 + for(i = n; i >= 16; i -= 16) { + // Prefetch the NEXT block (not current) at optimal distance + if (i > 17) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&a[i - 17]); + } + if (i > 17*2) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&a[i - 17*2]); + } + if (i > 17*3) { // Ensure we don't prefetch out of bounds + __builtin_prefetch(&a[i - 17*3]); + } + + // Process 16 elements in reverse order + auto num15 = a[i - 1]; + auto num14 = a[i - 2]; + auto num13 = a[i - 3]; + auto num12 = a[i - 4]; + auto num11 = a[i - 5]; + auto num10 = a[i - 6]; + auto num9 = a[i - 7]; + auto num8 = a[i - 8]; + auto num7 = a[i - 9]; + auto num6 = a[i - 10]; + auto num5 = a[i - 11]; + auto num4 = a[i - 12]; + auto num3 = a[i - 13]; + auto num2 = a[i - 14]; + auto num1 = a[i - 15]; + auto num0 = a[i - 16]; + + // Process all 16 elements (your bucket logic here) + tpxb_process_element(num15, buf, bucket1, shr1, mask1); + tpxb_process_element(num14, buf, bucket1, shr1, mask1); + tpxb_process_element(num13, buf, bucket1, shr1, mask1); + tpxb_process_element(num12, buf, bucket1, shr1, mask1); + tpxb_process_element(num11, buf, bucket1, shr1, mask1); + tpxb_process_element(num10, buf, bucket1, shr1, mask1); + tpxb_process_element(num9, buf, bucket1, shr1, mask1); + tpxb_process_element(num8, buf, bucket1, shr1, mask1); + tpxb_process_element(num7, buf, bucket1, shr1, mask1); + tpxb_process_element(num6, buf, bucket1, shr1, mask1); + tpxb_process_element(num5, buf, bucket1, shr1, mask1); + tpxb_process_element(num4, buf, bucket1, shr1, mask1); + tpxb_process_element(num3, buf, bucket1, shr1, mask1); + tpxb_process_element(num2, buf, bucket1, shr1, mask1); + tpxb_process_element(num1, buf, bucket1, shr1, mask1); + tpxb_process_element(num0, buf, bucket1, shr1, mask1); + } + // Handle remainder (less than 16 elements) + for(uint32_t j = i; j > 0; --j) { + auto num = a[j - 1]; auto bkeyni = (num >> shr1) & mask1; auto offset = --bucket1[bkeyni]; - - // Add to the proper target location buf[offset] = num; } } diff --git a/ypsu.cpp b/ypsu.cpp index 57388ef..ebd776e 100644 --- a/ypsu.cpp +++ b/ypsu.cpp @@ -895,8 +895,8 @@ int main(int argc, char **argv) { printf("Sorting %d elements:\n\n", n); // Uncomment this for profiling and alg! - measure_single(n); - return 0; + // measure_single(n); + // return 0; for (auto inputtype : inputtypes) { printf("%10s", inputtype.c_str());