Skip to content

[branch-53] ensure dynamic filters are correctly pushed down through aggregations…#123

Closed
jayshrivastava wants to merge 1 commit into
branch-53from
js/cherry-pick-dynamic-filter-pushdown-through-aggregations
Closed

[branch-53] ensure dynamic filters are correctly pushed down through aggregations…#123
jayshrivastava wants to merge 1 commit into
branch-53from
js/cherry-pick-dynamic-filter-pushdown-through-aggregations

Conversation

@jayshrivastava

@jayshrivastava jayshrivastava commented May 19, 2026

Copy link
Copy Markdown

Cherry-pick of apache#21059

…apache#21059)

- Closes apache#21065.

In plans such as the following, dynamic filters are not pushed down
through the aggregation
```
CREATE TABLE data (a VARCHAR, ts TIMESTAMP, value DOUBLE)
    AS VALUES
      ('h1', '2024-01-01T00:05:00', 1.0),
      ('h1', '2024-01-01T00:15:00', 2.0),
      ('h2', '2024-01-01T00:25:00', 3.0),
      ('h3', '2024-01-01T00:35:00', 4.0);

SELECT * FROM contexts c
  INNER JOIN (
    SELECT a, date_bin(interval '1 hour', ts) AS bucket, min(value) AS min_val
    FROM (SELECT value, a, ts FROM data)
    GROUP BY a, date_bin(interval '1 hour', ts)
  ) agg ON c.a = agg.a;
```

```
    HashJoinExec: mode=Auto, join_type=Inner, on=[(a@0, a@0)]
      DataSourceExec: partitions=1
      ProjectionExec: [a@0, date_bin(1h, ts)@1 as bucket, min(value)@2 as min_val]
        AggregateExec: mode=FinalPartitioned, gby=[a@0, date_bin(1h, ts)@1], aggr=[min(value)]
          AggregateExec: mode=Partial, gby=[a@1, date_bin(1h, ts@2)], aggr=[min(value)]
            ProjectionExec: [value@2, a@0, ts@1]        ← reorders columns
              DataSourceExec: partitions=1
```

`AggregateExec::gather_filters_for_pushdown` compared parent filter
columns (output schema indices) against grouping expression columns
(input schema indices). When a `ProjectionExec` below the aggregate
reorders columns, the index mismatch causes filters (such as HashJoin
dynamic filters) to be incorrectly blocked.

This change fixes the column index mapping in
`AggregateExec::gather_filters_for_pushdown`

- `test_pushdown_through_aggregate_with_reordered_input_columns` —
filter on grouping column with reordered input is pushed down
-
`test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result`
— filter on aggregate result column is not pushed down
- `test_pushdown_through_aggregate_grouping_sets_with_reordered_input` —
GROUPING SETS: filter on common column pushed, filter on missing column
blocked
-
`test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input`
— HashJoin dynamic filter pushes through aggregate with reordered input
and is populated with values after
   execution
  - All tests verified to fail without the fix

No.
@datadog-official

Copy link
Copy Markdown

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 1 Pipeline job failed

Rust | build and run with wasm-pack   View in Datadog   GitHub Actions

🛟 This job is unlikely to succeed on retry. Please review your pipeline configuration. Failed to download wasm-pack: 404 Not Found error from source URL.

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 114139c | Docs | Datadog PR Page | Give us feedback!

@jayshrivastava jayshrivastava changed the title ensure dynamic filters are correctly pushed down through aggregations… [branch-53] ensure dynamic filters are correctly pushed down through aggregations… May 19, 2026
LiaCastaneda added a commit that referenced this pull request May 22, 2026
apache#22453) (#126)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #.

## Rationale for this change

When the substrait consumer hits an `Aggregate` with two identical
measures (e.g. `sum(a)` present twice), planning fails with `Schema
contains duplicate unqualified field name`. Substrait carries column
names at the plan root rather than on the measures themselves, so the
measures arrive at `Aggregate` schema construction without aliases --
and two identical exprs produce two identical field names. PR apache#20539
fixed the `NameTracker` to dedupe duplicate names in the consumer, but
it was only applied to grouping expressions, not to the measures.

The planner sees:

```
field 1: (qualifier: None, name: "sum(data.a)")
field 2: (qualifier: None, name: "sum(data.a)")
```

which is rejected when constructing the Aggregate's output schema.

## What changes are included in this PR?

Run aggregate measures through the same `NameTracker` like the grouping
expressions in `from_aggregate_rel`

## Are these changes tested?

Yes -- added a roundtrip test `aggregate_identical_measures`. Without
the fix it produces `Error: SchemaError(DuplicateUnqualifiedField {
name: "sum(data.a)" }, Some(""))`

## Are there any user-facing changes?

No.

(cherry picked from commit 097efae)
LiaCastaneda added a commit that referenced this pull request May 26, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #.

## Rationale for this change

`array_any_match` is a commonly supported higher-order function in
systems like Spark (`exists`), Trino (`any_match`) among other engines.
It seems like a natural first addition alongside `array_transform` and
worth upstreaming I think.


## What changes are included in this PR?

Adds `array_any_match(array, predicate)` as a new higher-order function
(with aliases `any_match` and `list_any_match`). It returns:

`true` if any element satisfies the predicate
`false` if no element does (including empty arrays)
`null` if the predicate returns null for some elements and false for all
others

## Are these changes tested?

Yes I added unit tests and sqllogic tests

## Are there any user-facing changes?

Yes -- new SQL functions array_any_match, any_match, and list_any_match
are available.

---------

Co-authored-by: Dmitrii Blaginin <dmitrii@blaginin.me>
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
…apache#22727)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Closes apache#22726.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

The regex simplification rule rewrites anchored regex matches
(`^literal$`, `^(a|b)$`) into cheaper `=` / `IN` / `LIKE` expressions.
Two bugs in that path:

1. The literal was always built as `Utf8` via `lit(...)`, so on a
`Utf8View` / `LargeUtf8` column the rewritten comparison failed at
execution with `Invalid comparison operation: Utf8View == Utf8`.
2. A `~*` (case-insensitive) anchored literal was rewritten to a
case-sensitive `=`, silently dropping rows that differ only in case.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

- Build the extracted literal with `string_scalar.to_expr(...)` so its
type follows the column type (`Utf8` / `LargeUtf8` / `Utf8View`),
consistent with the existing `LIKE` branches.
- Rewrite `~*` anchored literals to `ILIKE` instead of `=`. The existing
`is_safe_for_like` guard ensures the literal has no `%` / `_`, so this
is an exact case-insensitive match. (Anchored alternations under `~*`
still fall back to regex evaluation.)

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

Yes. `predicates.slt` now covers anchored `~` / `~*`, single literals
and alternations, over both `Utf8` and `Utf8View` columns. Existing
`regex.rs` unit tests still pass.

## Are there any user-facing changes?

Yes, bug fixes only

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Closes apache#22826 .
- Closes apache#22490
- Closes apache#11108 

## Rationale for this change
  
SQL (per PG and IEEE 754) treats `+0.0` and `-0.0` as equal in `=`, `IS
DISTINCT FROM`, DISTINCT, GROUP BY, UNION/INTERSECT/EXCEPT, equi-joins,
and `array_*` set ops. DataFusion treated them as
  distinct because:

- Arrow's `cmp::eq`/`gt`/`lt` use IEEE 754 totalOrder for floats —
`arrow-ord-58.3.0/src/cmp.rs:71-75` explicitly says *"please normalize
zeros before calling this kernel"*.
- Arrow's `RowConverter` row-encodes floats with totalOrder; ±0 produce
different bytes.
- DataFusion's primitive float hashing used raw `to_bits()` /
`to_ne_bytes()`, so ±0 hashed to different buckets.

  ## What changes are included in this PR?

  **Helper** in `datafusion/common/src/utils/mod.rs`:
- `normalize_float_zero(&ArrayRef) -> ArrayRef` — rewrites `-0.0 → +0.0`
for Float16/32/64 via `PrimitiveArray::unary`; `Arc::clone` for
non-float. NaN payloads preserved (`bits << 1 == 0` matches only
  ±0).
- `normalize_float_zero_scalar(ScalarValue) -> ScalarValue` — symmetric
for scalars.

**Applied at six boundary sites** where DataFusion hands float data to
Arrow:
  | Site | Fixes |
  |---|---|
| `physical-expr-common/src/datum.rs::apply_cmp` | BinaryExpr `=`, `<`,
`>`, `IS DISTINCT FROM` |
| `physical-plan/src/joins/utils.rs::eq_dyn_null` | HashJoin row
equality → INNER JOIN, INTERSECT, EXCEPT |
| `physical-plan/src/aggregates/group_values/row.rs::intern` |
Multi-column row-encoded GROUP BY |
| `functions-nested/src/set_ops.rs::general_array_distinct` |
`array_distinct` |
| `functions-nested/src/set_ops.rs::generic_set_lists` | `array_union`,
`array_intersect` |
  | `functions-nested/src/except.rs::general_except` | `array_except` |

  **Float hash macros** normalize for consistency:
- `datafusion/common/src/hash_utils.rs::hash_float_value!` —
`create_hashes`, hash joins, shuffle.
-
`datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs::hash_float!`
— single-column primitive GROUP BY fast path.

**Single-column primitive GROUP BY / DISTINCT**
(`GroupValuesPrimitive::intern`,
`PrimitiveGroupValueBuilder::{append_val,vectorized_append,equal_to,vectorized_equal_to_*}`)
canonicalize the input via
a new default-identity `canonicalize` method on the local `HashValue`
trait (float override only). Trait visibility lifted to `pub` so the
multi-column file can use it.
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Closes apache#22765 .

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
…new hash aggregation impl (apache#22824)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- part of apache#22710

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Part of rewriting hash aggregation into several dedicated streams.

In the first step apache#22729,
`PartialHashAggregateStream` and `FinalHashAggregateStream` has been
split from the old `GroupsHashAggregateStream`, but both stream only
have basic implementation, no optimizations and extra features like
spilling.
\* it's incremental migration, so old impl won't change, we plan to
delete it once migration is finished

This PR forward ports the below optimization to the new implementation:
- apache#8038

The optimizer part don't have to move, ported changes are only inside
aggregate operator.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Extends `PartialHashAggregateStream` and `FinalHashAggregateStream` to
apply the optimization. See code comment at
`datafusion/physical-plan/src/aggregates/hash_aggregate.rs` for the
background.

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes, the original test in apache#8038
is only at `ExecutionPlan` level, they're still passing after the
change.
This PR added new test coverage: check `explain analyze` to ensure the
implementation actually respects this soft limit at runtime.

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->

---------

Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Closes apache#22778.
- Related: apache#21992, apache#22395.
- Needed by apache#22657.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

Declared scan output partitioning should use logical partitioning
metadata, not physical partitioning types. This adds logical range
partitioning so range-partitioned sources can declare their layout at
the logical layer.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

- Add logical `Partitioning::Range` and `RangePartitioning`.
- Move `SplitPoint` and shared split-point validation to
`datafusion-common`.
- Wire logical range partitioning through expression traversal,
rewrites, and display.
- Keep planning, logical proto, and Substrait support explicitly
unsupported for now.

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

Yes. Unit tests added

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

Yes. This adds public logical range partitioning API. No breaking API
changes.

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Closes apache#22818.

## Rationale for this change

DataFusion kept SQL dialect metadata in several separate places, which
made it easy for enum variants, canonical names, aliases, display names,
docs, and error messages to drift.

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

## What changes are included in this PR?

  - Centralize dialect metadata in one macro-driven source.
- Generate the `Dialect` enum, metadata table, canonical names, aliases,
display names, and available-dialect string from that source.
  - Replace `Dialect::AVAILABLE` with `Dialect::available()`.
  - Add `Dialect::metadata()` for iterating supported dialects.
- Update dialect error messages and generated config docs to use the
centralized metadata.
  - Add an upgrade note for the removed public constant.

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

## Are these changes tested?

Yes

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?

Yes. `Dialect::AVAILABLE` is removed and replaced by
`Dialect::available()`.

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
…che#22790)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

`array_remove` errored out when given a `NULL` array argument.

`array_replace` handled the `DataType::Null` case, but it always built a
NULL array of length 1, regardless of the actual input length.

`remove` and `replace` share nearly identical implementations, so they
should treat a NULL array argument the same way

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

- `array_remove`: handle `DataType::Null` in both
`array_remove_internal` and the scalar fast-path
`array_remove_with_scalar_args`, returning a NULL array of the input
length.
- `array_replace`: fix the existing `DataType::Null` branches to use the
input array length (`array.len()` / `list_array.len()`) instead of a
hard-coded `1`.

## Are these changes tested?

Yes, added slt coverage

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?

`array_remove`/`array_remove_n`/`array_remove_all` now return `NULL` for
a `NULL`-typed array argument instead of raising an error. No breaking
API changes.

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
gabotechs pushed a commit to ajegou/datafusion that referenced this pull request Jun 12, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes DataDog#123` indicates that this PR will close issue DataDog#123.
-->

- Related to apache#22882.

## Rationale for this change

I opened a new roadmap discussion, [DataFusion 2026 Q3-Q4 Roadmap
Discussion](apache#22882), and it
would be nice if public-facing docs pointed readers at it so the
community can find and join the current discussion.

## What changes are included in this PR?

- Add a roadmap pointer to the `README.md` "Contributing to DataFusion"
section linking to the roadmap docs and the current discussion (apache#22882).
- Update the contributor-guide roadmap page
(`docs/source/contributor-guide/roadmap.md`) to call out apache#22882 as the
current discussion and list it at the top of the quarterly roadmap
discussions.

## Are these changes tested?

No tests; documentation-only change.

## Are there any user-facing changes?

Documentation only — adds links to the current roadmap discussion.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant