368 lines
13 KiB
C++

#ifndef SPSORT_H
#define SPSORT_H
/*
* Sorting algorithm(s) that basically space-partition the domain of the sort.
*
* These all do some (usually linear or even const) operation on the input that
* "measures" the inputs domain. That is which (kind of) number ranges appear.
* Then we know more about the distribution of the elements when it comes to
* their value and thus can create "buckets" and partition in the space of the
* values in ways that separate them the most.
*
* Some of these algorithms are simple, some have dynamic data structures for
* bucketing similar to octrees in graphics (which served as main idea).
*
* We only implement some of these, but many variants are possible. If you
* only find a single implementation, that is fine too - also we might move
* this text to a markdown readme and separate algorithms into different files.
*/
#include <stdint.h>
#include <assert.h>
// FIXME: vector is for debugging
#include <vector>
/**
* A binary search based function
* to find the position
* where item should be inserted
* in a[low..high]
*/
int binary_search(uint32_t *a, int item, int low, int high) {
if (high <= low) {
return (item > a[low]) ?
(low + 1) : low;
}
int mid = (low + high) / 2;
if(item == a[mid]) return mid + 1;
if(item > a[mid]) return binary_search(a, item, mid + 1, high);
return binary_search(a, item, low, mid - 1);
}
/** Binary insertion sort */
void binsertion_sort(uint32_t *a, int n) {
uint32_t selected;
for (int i = 1; i < n; ++i) {
int j = i - 1;
selected = a[i];
// find location where selected should be inseretd
int loc = binary_search(a, selected, 0, j);
// Move all elements after location to create space
while (j >= loc) {
a[j + 1] = a[j];
j--;
}
a[j + 1] = selected;
}
}
/** Specific sort that sorts arrays that can only have two or one different values (but many of those). O(n) strictly linear! */
void twovalue_sort(uint32_t *t, int n) {
if(n > 0) {
// Prepare for counting
uint32_t v1 = t[0];
bool has_v2 = false;
uint32_t v2;
int c1 = 0;
int c2 = 0;
int i = 0;
// Find second value (if there is any) and stop there
while((i < n) && !has_v2) {
has_v2 = (t[i] != v1);
v2 = t[i]; // can be junk, but only used when has_v2 == true!
c1 += !has_v2;
c2 += has_v2;
++i;
}
// TODO: This only works for numbers from now on...
// But if we keep all until this point, we can
// do a similar "xchg to the sides" trick here
// by knowing which value is smaller and thus
// code this to work generally if needed!
// Finish counting loop
while(i < n) {
c1 += (v1 == t[i]); // count v1s
c2 += has_v2 ? (v2 == t[i]) : 0;
++i;
}
// We have counts of value variants, just write them back
// Get which is the smaller value and how many there is?
int sv = has_v2 ? ((v2 < v1) ? v2 : v1): v1;
int sc = has_v2 ? ((v2 < v1) ? c2 : c1): c1;
// Write out the smaller values
i = 0;
while(i < sc) {
t[i] = sv;
++i;
}
// Write out the big values for all remaining places
if(i < n) {
int bv = has_v2 ? ((v2 >= v1) ? v2 : v1): v1;
while(i < n) {
t[i] = bv;
++i;
}
}
}
}
/** Overflow-safe and generally safe */
inline uint32_t internal_mid(uint32_t low, uint32_t high) {
/* Unsigned-ness make this overflow-safe */
uint32_t mid = (low + high) >> 1;
assert(low <= mid);
assert(mid <= high);
return mid;
}
/** Sort using space-partitioning - does recursive re-pivoting */
inline void spsort(uint32_t *t, int n, int m = 32);
/** Helper function that puts elements higher then mid to the top of the array and lower to the bottom. Returns number of bottoms */
inline int internal_array_separate(uint32_t *t, int n, uint32_t mid, int bulk_xchg = 32) {
if(n > 0) {
// Two heads that also read & write (both)
int left = 0;
int right = n - 1;
// These are needed for more ILP so that we can do the xchg operations in bulk
// and without data dependencies just do it in an unrolled loop from time to time!
std::vector<int> xchg_left(0);
std::vector<int> xchg_right(0);
xchg_left.reserve(bulk_xchg);
xchg_right.reserve(bulk_xchg);
while(left < right) {
// Step over already good positioned values from left
while((left < right) && (t[left] < mid)) {
++left;
}
// Step over already good positioned values from right
while((left < right) && (t[right] >= mid)) {
--right;
}
// Extra check needed for edge-case!
if(left < right) {
// Both in wrong location - xchg them!
// instead of doing it right here, collect them up! (*)
xchg_left.push_back(left);
xchg_right.push_back(right);
}
// See if we can do some bulk-exchange now (*)
// This loop the compiler should more easily unroll
// and the CPU should be able to schedule ILP-wise!
if(xchg_left.size() <= bulk_xchg) {
for(int i = 0; i < xchg_left.size(); ++i) {
auto tmp = t[xchg_left[i]];
t[xchg_left[i]] = t[xchg_right[i]];
t[xchg_right[i]] = tmp;
}
xchg_left.resize(0);
xchg_right.resize(0);
}
}
// Finish the remaining bulk exchanges (*)
for(int i = 0; i < xchg_left.size(); ++i) {
auto tmp = t[xchg_left[i]];
t[xchg_left[i]] = t[xchg_right[i]];
t[xchg_right[i]] = tmp;
}
// Edge-case increment if single elem happens in middle in the end
left += ((left == right) && (t[left] < mid));
return left;
}
return 0;
}
// ALGO: [spsort] In-place variant that looks like a quicksort-kind-of-thing:
// - I realized that in-placing make this similar to quicksort actually
// - We check if the thing is small enough for insertion sorting in-place
// - If not, we calculate low-mid-high
// - Then we separate the array in low and high (relative to mid) by going two pointers left-and-right additions.
// - When loop ends, we have two smallers arrays to process.
// - Issue is that if the distribution of input is not equal distribution (or close), then
// we can end up many on left and none on right for example!
// - To solve that issue, we check if a side is below size of "m" bucket count.
// - If below, we run insertion sort on that.
// - If not below, we recurse totally - including min-max search domain pivoting!!! [re-pivoting]
// - If none of the new buckets too small, both go just partial recursion: pivot calculation not happen and
// we provide a pivot (mid) being right between our low-mid or mid-high pairs (depending side).
/** Helper function for spsort - does not do re-pivoting and knows the domain */
inline void internal_spsort(uint32_t *t, int n, int m, uint32_t low, uint32_t mid, uint32_t high) {
// No reason why we need to sort 1-big buckets...
// Also rules out n == 0 and n == 1 bad cases later if we do API this way.
assert(n > 1);
// Makes small arrays sort fast otherwise we need to go down m == 2 or such!
if(n < m) {
binsertion_sort(t, n);
return;
}
auto left = internal_array_separate(t, n, mid);
auto lefts = left;
auto rights = n - left;
// Re-pivot the mid if needed
if(lefts < m) {
// Left can be simple-sorted
binsertion_sort(t, left);
// Check the right now
if(rights < m) {
// Balance in both small-extremal
// means we can small-sort both.
binsertion_sort(t + left, rights);
} else {
// RE-PIVOT!!! [right]
// This tries to help for cases where the
// domain of da values have a few outliers
// which would cause many unnecessity split.
//
// This way the midpoint and all totally get
// re-evaluated too, which adds an O(n) here
// but make us have fewer extra steps hopefully.
if(lefts != 0) {
// Regular case (left had some elements at least)
spsort(t + left, rights, m);
} else {
// Extreme case where left was empty!
// This would lead to endless recursion
// so we need to handle this separately!
//
// When left is totally empty that means
// that right bucket can only have at most
// two distict values among its elements or
// less (that is it is either const values
// in a big array OR there are two different
// values in it in any ways). For this we
// implemented a special and optimized sort...
twovalue_sort(t + left, rights); // O(n)
}
}
} else {
// lefts are enough in count
if(rights < m) {
// if rights are too few, we insertion sort them
binsertion_sort(t + left, rights);
// RE-PIVOT!!! [left]
// and we also need repivot similar to above!
spsort(t, lefts, m);
} else {
// Both are non-extremal cases, which means balance too
// but this case we can calculate the mid by guesswork!
internal_spsort(t, lefts, m, low, internal_mid(low, mid), mid);
internal_spsort(t + left, rights, m, mid, internal_mid(mid, high), high);
}
}
}
/** Sort using space-partitioning - does recursive re-pivoting */
inline void spsort(uint32_t *t, int n, int m) {
assert(n * int64_t(sizeof(t[0])) <= INT_MAX);
if(n > 0) {
/* 1.) DOM min-max search. O(n) */
uint32_t low = UINT32_MAX;
uint32_t high = 0;
for(int i = 0; i < n; ++i) {
if(t[i] < low) low = t[i];
if(t[i] > high) high = t[i];
}
uint32_t mid = internal_mid(low, high); // pivot
/* 2.) Sort with space-partitioning */
internal_spsort(t, n, m, low, mid, high);
}
}
// TODO: [spsort_copy] The algorithm for the non-in-place (copy) variant:
// - Basically we take the data and measure the domain (min-max) in O(n)
// - We then go over the input again (n times the following):
// -- Put the value in the bucket where it belongs (octree-insert-1D operation)
// -- If bucket grows too big (more than 'm' elements), then split the bucket contents.
// -- Split can be binary OR any kind of n-ary split in the algorithm of ours.
// -- An O(m) operation splits into the sub-buckets
// -- So far so good, because insertion point finding is O(logn) and this is O(m) which latter is const.
// -- The problem is that in the worst case however, We generated something that we need to split right again!
// -- Actually that is not the worst yet, the worst is that we need to do so yet we not win on that!
// -- When that happens? When the values have a very few outliers and a bunch of "regular" data on the other end of domain...
// -- I advise to re-evaluate the pivot value at this point to avoid too many unnecessary steps (min-max on the bucket).
/** Sort using space-partitioning - not in-place! */
inline void spsort_copy(uint32_t *t, int n) {
assert(n * int64_t(sizeof(t[0])) <= INT_MAX);
if(n > 0) {
/* 1.) DOM min-max search. O(n) */
uint32_t min = UINT32_MAX;
uint32_t max = 0;
for(int i = 0; i < n; ++i) {
if(t[i] < min) min = t[i];
if(t[i] > max) max = t[i];
}
/* 2.) Sort with space-partitioning */
/* Unsigned-ness make this overflow-safe */
uint32_t pivot = (min + max) >> 1;
for(int i = 0; i < n; ++i) {
// TODO: using classes and malloc (linked data)?
}
}
}
// TODO: [spsort_inplace] In-place variant 2!
// See spsort_copy for more info...
//
// Step 1:
// - Do min-max just like before
// - We in the meantime split the input in two: based on the top bit. This is a radix-like step just on top bit.
// - In Step2 we work on two arrays: one has top bits all set, other has not. We can thus use the top bit as flag.
// - This happens in-place by swapping onto the left-right parts of the array.
//
// Step 2:
// - First off we need to "realize" that the sum of number of elements of buckets are <= "n" (the elemno)!
// - Then we will realize soon that it is exactly "i" (the loop variable) in the sort loop!
// - This "hints" that we can do this alg in-place, or at least inplace numbers and some log(n) tree-descriptor
// - The already-processed part of the array (below ith elements) should be where we place data for the sp-tree buckets!
// - First there should be a single bucket - just like in the original. That is from the left of the array.
// - When we have more elements than 'm' (bucket max size) value, we need to split contents of the bucket in two smaller.
// - Then:
// -- We can use the "flag bit" to tell where the buckets end.
// -- Or can do a data structure that describes the tree.
// - Data we sort will stay in-place, but there is either runtime cost OR additional memory cost for the tree descriptor.
// The runtime cost happens on "inserting" into the space partitioning when we need to check the flags.
// - When "splitting" we need to split that bucket by xchgs to the first-last elements from two sides.
// - In beginning there is only one bucket and we check if we split that just by its size.
// - I feel there needs a log(n) data structure with from-to descriptors of the tree + children descriptor.
// - Maybe the "more-in-place" variant should be in its own file separately?
#endif /* SPSORT_H */