ulp_accuracy: use dynamic work queue for thread load balancing

libeigen/eigen!2383

Co-authored-by: Rasmus Munk Larsen <rmlarsen@gmail.com>
This commit is contained in:
Rasmus Munk Larsen
2026-04-02 22:40:03 -07:00
parent 5977635d64
commit ebae0c7c10

View File

@@ -20,6 +20,7 @@
#include <mpfr.h>
#endif
#include <atomic>
#include <cfloat>
#include <chrono>
#include <cmath>
@@ -256,41 +257,62 @@ static uint64_t count_scalars_in_range(Scalar lo, Scalar hi) {
return diff == UINT64_MAX ? UINT64_MAX : diff + 1;
}
// Advances a scalar by n ULPs in the linear representation.
static float advance_scalar(float x, uint64_t n) {
int64_t lin = scalar_to_linear(x);
lin += static_cast<int64_t>(n);
int32_t ibits;
if (lin < 0) {
ibits = static_cast<int32_t>(INT32_MIN) - static_cast<int32_t>(lin) - 1;
} else {
ibits = static_cast<int32_t>(lin);
}
// ============================================================================
// Inverse of scalar_to_linear: maps a linear integer back to a scalar.
// ============================================================================
static float linear_to_scalar(int64_t lin, float /*tag*/) {
int32_t ibits = static_cast<int32_t>(lin);
if (ibits < 0) ibits = static_cast<int32_t>(INT32_MIN) - ibits - 1;
float result;
std::memcpy(&result, &ibits, sizeof(result));
return result;
}
static double advance_scalar(double x, uint64_t n) {
int64_t lin = scalar_to_linear(x);
lin += static_cast<int64_t>(n);
int64_t ibits;
if (lin < 0) {
ibits = static_cast<int64_t>(INT64_MIN) - lin - 1;
} else {
ibits = lin;
}
static double linear_to_scalar(int64_t lin, double /*tag*/) {
int64_t ibits = lin;
if (ibits < 0) ibits = static_cast<int64_t>(INT64_MIN) - ibits - 1;
double result;
std::memcpy(&result, &ibits, sizeof(result));
return result;
}
// ============================================================================
// Worker thread: evaluates Eigen and reference over a subrange
// Dynamic work queue: threads atomically claim chunks for load balancing
// ============================================================================
template <typename Scalar>
static void worker(const FuncEntry<Scalar>& func, Scalar lo, Scalar hi, int batch_size, bool use_mpfr, double step_eps,
struct WorkQueue {
int64_t range_hi_lin;
int64_t chunk_size;
double step_eps;
std::atomic<int64_t> next_lin;
void init(Scalar lo, Scalar hi, int64_t csz, double step) {
range_hi_lin = scalar_to_linear(hi);
chunk_size = csz;
step_eps = step;
next_lin.store(scalar_to_linear(lo), std::memory_order_relaxed);
}
// Claim the next chunk. Returns false when no work remains.
bool claim(Scalar& chunk_lo, Scalar& chunk_hi) {
int64_t lo_lin = next_lin.fetch_add(chunk_size, std::memory_order_relaxed);
if (lo_lin > range_hi_lin) return false;
int64_t hi_lin = lo_lin + chunk_size - 1;
if (hi_lin > range_hi_lin) hi_lin = range_hi_lin;
chunk_lo = linear_to_scalar(lo_lin, Scalar(0));
chunk_hi = linear_to_scalar(hi_lin, Scalar(0));
return true;
}
};
// ============================================================================
// Worker thread: dynamically claims chunks from a shared work queue
// ============================================================================
template <typename Scalar>
static void worker(const FuncEntry<Scalar>& func, WorkQueue<Scalar>& queue, int batch_size, bool use_mpfr,
ThreadResult<Scalar>& result) {
using ArrayType = Eigen::Array<Scalar, Eigen::Dynamic, 1>;
ArrayType input(batch_size);
@@ -326,29 +348,32 @@ static void worker(const FuncEntry<Scalar>& func, Scalar lo, Scalar hi, int batc
}
};
int idx = 0;
Scalar x = lo;
for (;;) {
input[idx] = x;
idx++;
Scalar chunk_lo, chunk_hi;
while (queue.claim(chunk_lo, chunk_hi)) {
int idx = 0;
Scalar x = chunk_lo;
for (;;) {
input[idx] = x;
idx++;
if (idx == batch_size) {
func.eigen_eval(eigen_out, input);
process_batch(batch_size, input, eigen_out);
idx = 0;
if (idx == batch_size) {
func.eigen_eval(eigen_out, input);
process_batch(batch_size, input, eigen_out);
idx = 0;
}
if (x >= chunk_hi) break;
Scalar next = advance_by_step(x, queue.step_eps);
x = (next > chunk_hi) ? chunk_hi : next;
}
if (x >= hi) break;
Scalar next = advance_by_step(x, step_eps);
x = (next > hi) ? hi : next;
}
// Process remaining partial batch. Pad unused slots with the last valid
// input so the full-size vectorized eval doesn't read uninitialized memory.
if (idx > 0) {
for (int i = idx; i < batch_size; i++) input[i] = input[idx - 1];
func.eigen_eval(eigen_out, input);
process_batch(idx, input, eigen_out);
// Process remaining partial batch. Pad unused slots with the last valid
// input so the full-size vectorized eval doesn't read uninitialized memory.
if (idx > 0) {
for (int i = idx; i < batch_size; i++) input[i] = input[idx - 1];
func.eigen_eval(eigen_out, input);
process_batch(idx, input, eigen_out);
}
}
#ifdef EIGEN_HAS_MPFR
@@ -424,10 +449,6 @@ static int run_test(const Options& opts) {
std::printf("\n");
std::fflush(stdout);
// Split range across threads.
if (total_scalars > 0 && static_cast<uint64_t>(num_threads) > total_scalars) {
num_threads = static_cast<int>(total_scalars);
}
if (num_threads < 1) num_threads = 1;
// Heap-allocate each ThreadResult separately to avoid false sharing.
@@ -438,22 +459,20 @@ static int run_test(const Options& opts) {
results.back()->init(opts.hist_width);
}
std::vector<std::thread> threads;
uint64_t scalars_per_thread = total_scalars / num_threads;
Scalar chunk_lo = lo;
// Use dynamic work distribution: threads claim small chunks from a shared
// queue. This ensures even load balancing regardless of how per-value
// work varies across the range (e.g. log on negatives is trivial).
// Choose chunk_size so we get ~16 chunks per thread for good balancing.
int64_t chunk_size = std::max(int64_t(1), static_cast<int64_t>(total_scalars / (num_threads * 16)));
WorkQueue<Scalar> queue;
queue.init(lo, hi, chunk_size, opts.step_eps);
std::vector<std::thread> threads;
auto start_time = std::chrono::steady_clock::now();
for (int t = 0; t < num_threads; t++) {
Scalar chunk_hi;
if (t == num_threads - 1) {
chunk_hi = hi;
} else {
chunk_hi = advance_scalar(chunk_lo, scalars_per_thread - 1);
}
threads.emplace_back(worker<Scalar>, std::cref(*entry), chunk_lo, chunk_hi, opts.batch_size, opts.use_mpfr,
opts.step_eps, std::ref(*results[t]));
chunk_lo = std::nextafter(chunk_hi, std::numeric_limits<Scalar>::infinity());
threads.emplace_back(worker<Scalar>, std::cref(*entry), std::ref(queue), opts.batch_size, opts.use_mpfr,
std::ref(*results[t]));
}
for (auto& t : threads) t.join();