feat: add executor pool support#687
Conversation
27790b7 to
6694105
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Nice work. The abstraction is clean and the test coverage is solid. Since the default executor is nullopt, existing paths stay single-threaded, so the risk is well contained.
A couple of things before merge:
- The PR has no description. Worth adding the motivation and a short design note, ideally with the parallel/serial numbers that justify it.
- The
RetryRunnerchange from a runtime fluent API (OnlyRetryOn/StopRetryOn) to the compile-timeRetryRunner<Policy>is a breaking API change that's independent of executor support. Consider splitting it into its own PR so it gets reviewed on its own merits.
Left a few inline notes.
wgtmac
left a comment
There was a problem hiding this comment.
A few more minor notes, non-blocking.
wgtmac
left a comment
There was a problem hiding this comment.
Another pass, this time on the interface design and extensibility.
The overall shape is good: one virtual Executor that engines implement, threaded through the builders via PlanWith. The Arrow adapter test is a nice proof that an external pool drops in with basically one line, so "bring your own threadpool" is well covered.
On future async directions (C++23 coroutines, P2300 std::execution): the model here is synchronous and blocking, TaskGroup::Run() fans out and blocks on std::future::get(), and the planning APIs return Result<...> directly. So this is a parallel-for primitive, not a step toward a sender/receiver pipeline. That's a reasonable scope for now, I'd just flag it explicitly so nobody expects this interface to extend into async later, it'll be a separate one. Details inline.
wgtmac
left a comment
There was a problem hiding this comment.
One more pass. The main thing I found is a documentation gap around a new concurrency contract that now leaks onto user-supplied callbacks.
Once an executor is set, the user's ManifestWriterFactory and the shared FileIO get called from multiple worker threads. The tests already account for this (the factories use an atomic counter plus a barrier), so the requirement is understood, it's just not written down anywhere a downstream engine would see it. Worth documenting on the public surface.
| */ | ||
|
|
||
| // Borrowed the file from Apache Arrow: | ||
| // https://github.com/apache/arrow/blob/main/cpp/src/arrow/util/functional.h |
Rebases onto apache#687's executor pool support per @wgtmac's review: * FileCleanupStrategy now takes an OptionalExecutor in its constructor. When the custom DeleteWith() callback is configured, per-path deletes fan out through a TaskGroup that uses the executor (or runs serially when none is set, preserving prior behavior). * The FileIO bulk delete path wraps file_io_->DeleteFiles in a tight RetryRunner<retry::StopRetryOn<kNotFound>> so transient FileIO errors no longer give up after the first attempt. Mirrors Java's Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). * Adds ExpireSnapshots::Executor(OptionalExecutor) so callers can opt in to parallel deletion, and threads it down through both IncrementalFileCleanup and ReachableFileCleanup. * Drops the std::async / std::thread / std::span machinery and the ad-hoc retry loop -- replaced by util/task_group.h and util/retry_util.h from apache#687. Adds ExecutorDispatchesDeletesConcurrently test that wires a test::ThreadExecutor through the new API and asserts the executor received one submission per file.
Rebases onto apache#687's executor pool support per @wgtmac's review: * FileCleanupStrategy now takes an OptionalExecutor in its constructor. When the custom DeleteWith() callback is configured, per-path deletes fan out through a TaskGroup that uses the executor (or runs serially when none is set, preserving prior behavior). * The FileIO bulk delete path wraps file_io_->DeleteFiles in a tight RetryRunner<retry::StopRetryOn<kNotFound>> so transient FileIO errors no longer give up after the first attempt. Mirrors Java's Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). * Adds ExpireSnapshots::Executor(OptionalExecutor) so callers can opt in to parallel deletion, and threads it down through both IncrementalFileCleanup and ReachableFileCleanup. * Drops the std::async / std::thread / std::span machinery and the ad-hoc retry loop -- replaced by util/task_group.h and util/retry_util.h from apache#687. Adds ExecutorDispatchesDeletesConcurrently test that wires a test::ThreadExecutor through the new API and asserts the executor received one submission per file.
No description provided.