From ebae0c7c103de5c8fdb11d765715b6c8dd13c390 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen <4643818-rmlarsen1@users.noreply.gitlab.com> Date: Thu, 2 Apr 2026 22:40:03 -0700 Subject: [PATCH] ulp_accuracy: use dynamic work queue for thread load balancing libeigen/eigen!2383 Co-authored-by: Rasmus Munk Larsen --- test/ulp_accuracy/ulp_accuracy.cpp | 133 ++++++++++++++++------------- 1 file changed, 76 insertions(+), 57 deletions(-) diff --git a/test/ulp_accuracy/ulp_accuracy.cpp b/test/ulp_accuracy/ulp_accuracy.cpp index 9b427753e..8a8c62f05 100644 --- a/test/ulp_accuracy/ulp_accuracy.cpp +++ b/test/ulp_accuracy/ulp_accuracy.cpp @@ -20,6 +20,7 @@ #include #endif +#include #include #include #include @@ -256,41 +257,62 @@ static uint64_t count_scalars_in_range(Scalar lo, Scalar hi) { return diff == UINT64_MAX ? UINT64_MAX : diff + 1; } -// Advances a scalar by n ULPs in the linear representation. -static float advance_scalar(float x, uint64_t n) { - int64_t lin = scalar_to_linear(x); - lin += static_cast(n); - int32_t ibits; - if (lin < 0) { - ibits = static_cast(INT32_MIN) - static_cast(lin) - 1; - } else { - ibits = static_cast(lin); - } +// ============================================================================ +// Inverse of scalar_to_linear: maps a linear integer back to a scalar. +// ============================================================================ + +static float linear_to_scalar(int64_t lin, float /*tag*/) { + int32_t ibits = static_cast(lin); + if (ibits < 0) ibits = static_cast(INT32_MIN) - ibits - 1; float result; std::memcpy(&result, &ibits, sizeof(result)); return result; } -static double advance_scalar(double x, uint64_t n) { - int64_t lin = scalar_to_linear(x); - lin += static_cast(n); - int64_t ibits; - if (lin < 0) { - ibits = static_cast(INT64_MIN) - lin - 1; - } else { - ibits = lin; - } +static double linear_to_scalar(int64_t lin, double /*tag*/) { + int64_t ibits = lin; + if (ibits < 0) ibits = static_cast(INT64_MIN) - ibits - 1; double result; std::memcpy(&result, &ibits, sizeof(result)); return result; } // ============================================================================ -// Worker thread: evaluates Eigen and reference over a subrange +// Dynamic work queue: threads atomically claim chunks for load balancing // ============================================================================ template -static void worker(const FuncEntry& func, Scalar lo, Scalar hi, int batch_size, bool use_mpfr, double step_eps, +struct WorkQueue { + int64_t range_hi_lin; + int64_t chunk_size; + double step_eps; + std::atomic next_lin; + + void init(Scalar lo, Scalar hi, int64_t csz, double step) { + range_hi_lin = scalar_to_linear(hi); + chunk_size = csz; + step_eps = step; + next_lin.store(scalar_to_linear(lo), std::memory_order_relaxed); + } + + // Claim the next chunk. Returns false when no work remains. + bool claim(Scalar& chunk_lo, Scalar& chunk_hi) { + int64_t lo_lin = next_lin.fetch_add(chunk_size, std::memory_order_relaxed); + if (lo_lin > range_hi_lin) return false; + int64_t hi_lin = lo_lin + chunk_size - 1; + if (hi_lin > range_hi_lin) hi_lin = range_hi_lin; + chunk_lo = linear_to_scalar(lo_lin, Scalar(0)); + chunk_hi = linear_to_scalar(hi_lin, Scalar(0)); + return true; + } +}; + +// ============================================================================ +// Worker thread: dynamically claims chunks from a shared work queue +// ============================================================================ + +template +static void worker(const FuncEntry& func, WorkQueue& queue, int batch_size, bool use_mpfr, ThreadResult& result) { using ArrayType = Eigen::Array; ArrayType input(batch_size); @@ -326,29 +348,32 @@ static void worker(const FuncEntry& func, Scalar lo, Scalar hi, int batc } }; - int idx = 0; - Scalar x = lo; - for (;;) { - input[idx] = x; - idx++; + Scalar chunk_lo, chunk_hi; + while (queue.claim(chunk_lo, chunk_hi)) { + int idx = 0; + Scalar x = chunk_lo; + for (;;) { + input[idx] = x; + idx++; - if (idx == batch_size) { - func.eigen_eval(eigen_out, input); - process_batch(batch_size, input, eigen_out); - idx = 0; + if (idx == batch_size) { + func.eigen_eval(eigen_out, input); + process_batch(batch_size, input, eigen_out); + idx = 0; + } + + if (x >= chunk_hi) break; + Scalar next = advance_by_step(x, queue.step_eps); + x = (next > chunk_hi) ? chunk_hi : next; } - if (x >= hi) break; - Scalar next = advance_by_step(x, step_eps); - x = (next > hi) ? hi : next; - } - - // Process remaining partial batch. Pad unused slots with the last valid - // input so the full-size vectorized eval doesn't read uninitialized memory. - if (idx > 0) { - for (int i = idx; i < batch_size; i++) input[i] = input[idx - 1]; - func.eigen_eval(eigen_out, input); - process_batch(idx, input, eigen_out); + // Process remaining partial batch. Pad unused slots with the last valid + // input so the full-size vectorized eval doesn't read uninitialized memory. + if (idx > 0) { + for (int i = idx; i < batch_size; i++) input[i] = input[idx - 1]; + func.eigen_eval(eigen_out, input); + process_batch(idx, input, eigen_out); + } } #ifdef EIGEN_HAS_MPFR @@ -424,10 +449,6 @@ static int run_test(const Options& opts) { std::printf("\n"); std::fflush(stdout); - // Split range across threads. - if (total_scalars > 0 && static_cast(num_threads) > total_scalars) { - num_threads = static_cast(total_scalars); - } if (num_threads < 1) num_threads = 1; // Heap-allocate each ThreadResult separately to avoid false sharing. @@ -438,22 +459,20 @@ static int run_test(const Options& opts) { results.back()->init(opts.hist_width); } - std::vector threads; - uint64_t scalars_per_thread = total_scalars / num_threads; - Scalar chunk_lo = lo; + // Use dynamic work distribution: threads claim small chunks from a shared + // queue. This ensures even load balancing regardless of how per-value + // work varies across the range (e.g. log on negatives is trivial). + // Choose chunk_size so we get ~16 chunks per thread for good balancing. + int64_t chunk_size = std::max(int64_t(1), static_cast(total_scalars / (num_threads * 16))); + WorkQueue queue; + queue.init(lo, hi, chunk_size, opts.step_eps); + std::vector threads; auto start_time = std::chrono::steady_clock::now(); for (int t = 0; t < num_threads; t++) { - Scalar chunk_hi; - if (t == num_threads - 1) { - chunk_hi = hi; - } else { - chunk_hi = advance_scalar(chunk_lo, scalars_per_thread - 1); - } - threads.emplace_back(worker, std::cref(*entry), chunk_lo, chunk_hi, opts.batch_size, opts.use_mpfr, - opts.step_eps, std::ref(*results[t])); - chunk_lo = std::nextafter(chunk_hi, std::numeric_limits::infinity()); + threads.emplace_back(worker, std::cref(*entry), std::ref(queue), opts.batch_size, opts.use_mpfr, + std::ref(*results[t])); } for (auto& t : threads) t.join();