Skip to content

perf: share encoder/reservation across PartitionedTopKExec partition …#23096

Merged
kosiew merged 4 commits into
apache:mainfrom
SubhamSinghal:partitioned-topk-shared-state
Jun 29, 2026
Merged

perf: share encoder/reservation across PartitionedTopKExec partition …#23096
kosiew merged 4 commits into
apache:mainfrom
SubhamSinghal:partitioned-topk-shared-state

Conversation

@SubhamSinghal

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Follow-up to #21479 (PartitionedTopKExec for ROW_NUMBER ... PARTITION BY ... LIMIT N) toward closing #6899.

Rationale for this change

PartitionedTopKExec today maintains a HashMap<OwnedRow, TopK> — one full TopK per distinct partition key. Each TopK carries its own RowConverter, MemoryReservation registered with the runtime pool, TopKMetrics, and scratch Rows buffer. With high partition cardinality every partition seen for the first time pays:

  • RowConverter::new (parses SortField list, allocates per-encoder state)
  • MemoryConsumer::register with the pool (involves a global lock)
  • per-counter TopKMetrics setup
  • scratch Rows::empty_rows allocation

For the h2o window-TopN sweep on a 10M-row CSV (id3 % N partition cardinality), this shows up as a regression at ≥10K partitions — PartitionedTopKExec is slower than the unpartitioned SortExec baseline that it's meant to replace.

What changes are included in this PR?

Adds a PartitionedTopK sibling type to topk/mod.rs that holds the shared encoder/reservation/metrics state once at the operator level and a HashMap<OwnedRow, TopKHeap> of cheap per-partition heap state. PartitionedTopKExec switches from HashMap<OwnedRow, TopK> to one PartitionedTopK.

Bench results

Today's default (main, flag-off) vs this PR (flag-on)

Partitions main flag-off this PR flag-on Delta
100 282 ms 105 ms 2.7x faster
1,000 247 ms 110 ms 2.2x faster
10,000 250 ms 137 ms 1.8x faster
100,000 222 ms 320 ms 1.4x slower

h2o id3 % N sweep, 10M-row CSV, 3 iterations per query, release build, enable_window_topn=true on both sides:

Partitions main this PR Speedup
100 110 ms 105 ms ~1.0×
1,000 117 ms 110 ms ~1.0×
10,000 640 ms 137 ms 4.7×
100,000 4,327 ms 320 ms 13.5×

10K is the inflection point: on main it's a regression vs the sort baseline (640 ms vs 234 ms); after this PR it's a win (137 ms vs 234 ms — 1.7× faster than sort). 100K nearly catches up to the sort baseline (320 ms vs 238 ms).

enable_window_topn default stays false per the #21479 discussion — 100K+ remains slower than sort on average, so this PR doesn't motivate flipping the default. It's the prerequisite for further optimizations that would attack the residual 100K+ cliff.

Are these changes tested?

Yes

Are there any user-facing changes?

No public API changes.

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 22, 2026
@SubhamSinghal

Copy link
Copy Markdown
Contributor Author

@kosiew can you review this PR?

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SubhamSinghal
Thanks for the change! The implementation and the approach looks good to me. I just have a couple of small, non-blocking suggestions.

/// Builds a `(pk Int32, val Int32)` schema and a `PartitionedTopK`
/// partitioned by `pk` with order `val ASC`. Helper for the
/// `PartitionedTopK` tests below.
fn build_partitioned_topk(k: usize) -> Result<(Arc<Schema>, PartitionedTopK)> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition to the test coverage. One thing that might be worth adding is a regression covering ORDER BY val DESC and/or NULL sort values. This path relies on Arrow row encoding for sort direction and null ordering, so having one extra SQL-level or unit test would help protect the shared encoder rewrite across the main ordering variants used by window Top-N queries. Not blocking since this reuses the existing build_sort_fields/RowConverter path and the current targeted tests pass.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added UT covering null first, null last and sort desc direction

Comment thread benchmarks/queries/h2o/window.sql Outdated
SELECT id3 % 100000 AS pk, v2 AS largest2_v2,
ROW_NUMBER() OVER (PARTITION BY id3 % 100000 ORDER BY v2 DESC) AS order_v2
FROM large WHERE v2 IS NOT NULL
) sub_query WHERE order_v2 <= 2; No newline at end of file

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small formatting nit: could you add a trailing newline at EOF? No behavioral impact, just keeps the file consistent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

@kosiew kosiew added this pull request to the merge queue Jun 29, 2026
Merged via the queue into apache:main with commit cae5f2e Jun 29, 2026
38 checks passed
@SubhamSinghal SubhamSinghal deleted the partitioned-topk-shared-state branch June 29, 2026 03:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants