perf: share encoder/reservation across PartitionedTopKExec partition …#23096
Conversation
|
@kosiew can you review this PR? |
There was a problem hiding this comment.
@SubhamSinghal
Thanks for the change! The implementation and the approach looks good to me. I just have a couple of small, non-blocking suggestions.
| /// Builds a `(pk Int32, val Int32)` schema and a `PartitionedTopK` | ||
| /// partitioned by `pk` with order `val ASC`. Helper for the | ||
| /// `PartitionedTopK` tests below. | ||
| fn build_partitioned_topk(k: usize) -> Result<(Arc<Schema>, PartitionedTopK)> { |
There was a problem hiding this comment.
Nice addition to the test coverage. One thing that might be worth adding is a regression covering ORDER BY val DESC and/or NULL sort values. This path relies on Arrow row encoding for sort direction and null ordering, so having one extra SQL-level or unit test would help protect the shared encoder rewrite across the main ordering variants used by window Top-N queries. Not blocking since this reuses the existing build_sort_fields/RowConverter path and the current targeted tests pass.
There was a problem hiding this comment.
Added UT covering null first, null last and sort desc direction
| SELECT id3 % 100000 AS pk, v2 AS largest2_v2, | ||
| ROW_NUMBER() OVER (PARTITION BY id3 % 100000 ORDER BY v2 DESC) AS order_v2 | ||
| FROM large WHERE v2 IS NOT NULL | ||
| ) sub_query WHERE order_v2 <= 2; No newline at end of file |
There was a problem hiding this comment.
Small formatting nit: could you add a trailing newline at EOF? No behavioral impact, just keeps the file consistent.
Which issue does this PR close?
Follow-up to #21479 (
PartitionedTopKExecforROW_NUMBER ... PARTITION BY ... LIMIT N) toward closing #6899.Rationale for this change
PartitionedTopKExectoday maintains aHashMap<OwnedRow, TopK>— one fullTopKper distinct partition key. EachTopKcarries its ownRowConverter,MemoryReservationregistered with the runtime pool,TopKMetrics, and scratchRowsbuffer. With high partition cardinality every partition seen for the first time pays:RowConverter::new(parsesSortFieldlist, allocates per-encoder state)MemoryConsumer::registerwith the pool (involves a global lock)TopKMetricssetupRows::empty_rowsallocationFor the h2o window-TopN sweep on a 10M-row CSV (
id3 % Npartition cardinality), this shows up as a regression at ≥10K partitions —PartitionedTopKExecis slower than the unpartitionedSortExecbaseline that it's meant to replace.What changes are included in this PR?
Adds a
PartitionedTopKsibling type totopk/mod.rsthat holds the shared encoder/reservation/metrics state once at the operator level and aHashMap<OwnedRow, TopKHeap>of cheap per-partition heap state.PartitionedTopKExecswitches fromHashMap<OwnedRow, TopK>to onePartitionedTopK.Bench results
Today's default (main, flag-off) vs this PR (flag-on)
h2o
id3 % Nsweep, 10M-row CSV, 3 iterations per query, release build,enable_window_topn=trueon both sides:main10K is the inflection point: on
mainit's a regression vs the sort baseline (640 ms vs 234 ms); after this PR it's a win (137 ms vs 234 ms — 1.7× faster than sort). 100K nearly catches up to the sort baseline (320 ms vs 238 ms).enable_window_topndefault staysfalseper the #21479 discussion — 100K+ remains slower than sort on average, so this PR doesn't motivate flipping the default. It's the prerequisite for further optimizations that would attack the residual 100K+ cliff.Are these changes tested?
Yes
Are there any user-facing changes?
No public API changes.