diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 942432239612e..6e5c9641d1422 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -59,7 +59,7 @@ use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; -use datafusion_physical_expr::Distribution; +use datafusion_physical_expr::{Distribution, RangePartitioning, SplitPoint}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::ExecutionPlan; use datafusion_physical_plan::expressions::col; @@ -70,7 +70,8 @@ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, displayable, + DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties, + displayable, }; use insta::Settings; @@ -325,6 +326,50 @@ fn parquet_exec_multiple_sorted( DataSourceExec::from_data_source(config) } +fn parquet_exec_with_output_partitioning( + output_partitioning: Partitioning, +) -> Arc { + let file_groups = (0..output_partitioning.partition_count()) + .map(|idx| FileGroup::new(vec![PartitionedFile::new(format!("p{idx}"), 100)])) + .collect(); + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(ParquetSource::new(schema())), + ) + .with_file_groups(file_groups) + .with_output_partitioning(Some(output_partitioning)) + .build(); + + DataSourceExec::from_data_source(config) +} + +fn range_partitioning( + key: &str, + split_points: impl IntoIterator, + options: SortOptions, +) -> Result { + let ordering = [PhysicalSortExpr { + expr: col(key, &schema())?, + options, + }] + .into(); + let split_points = split_points + .into_iter() + .map(|value| SplitPoint::new(vec![ScalarValue::Int64(Some(value))])) + .collect(); + Ok(Partitioning::Range(RangePartitioning::try_new( + ordering, + split_points, + )?)) +} + +fn hash_partitioning(key: &str, partition_count: usize) -> Result { + Ok(Partitioning::Hash( + vec![col(key, &schema())?], + partition_count, + )) +} + fn csv_exec() -> Arc { csv_exec_with_sort(vec![]) } @@ -700,6 +745,135 @@ impl TestConfig { } } +fn partitioned_join_plan( + left_partitioning: Partitioning, + right_partitioning: Partitioning, + join_type: JoinType, +) -> Arc { + let left = parquet_exec_with_output_partitioning(left_partitioning); + let right = parquet_exec_with_output_partitioning(right_partitioning); + let join_on = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _, + )]; + hash_join_exec(left, right, &join_on, &join_type) +} + +#[test] +fn inner_range_join_keeps_range_partitioning() -> Result<()> { + let join = partitioned_join_plan( + range_partitioning("a", [10, 20], SortOptions::default())?, + range_partitioning("b", [10, 20], SortOptions::default())?, + JoinType::Inner, + ); + + let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT); + + assert_plan!( + plan, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)] + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)], 3), file_type=parquet + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([b@1 ASC], [(10), (20)], 3), file_type=parquet + " + ); + + Ok(()) +} + +#[test] +fn inner_range_join_rehashes_different_bounds() -> Result<()> { + let join = partitioned_join_plan( + range_partitioning("a", [10, 20], SortOptions::default())?, + range_partitioning("b", [15, 20], SortOptions::default())?, + JoinType::Inner, + ); + + let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT); + + assert_plan!( + plan, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=3 + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)], 3), file_type=parquet + RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=3 + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([b@1 ASC], [(15), (20)], 3), file_type=parquet + " + ); + + Ok(()) +} + +#[test] +fn inner_hash_join_rehashes_mismatched_counts() -> Result<()> { + let join = partitioned_join_plan( + hash_partitioning("a", 11)?, + hash_partitioning("b", 12)?, + JoinType::Inner, + ); + + let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT); + + assert_plan!( + plan, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=11 + DataSourceExec: file_groups={11 groups: [[p0], [p1], [p2], [p3], [p4], [p5], [p6], [p7], [p8], [p9], [p10]]}, projection=[a, b, c, d, e], output_partitioning=Hash([a@0], 11), file_type=parquet + RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=12 + DataSourceExec: file_groups={12 groups: [[p0], [p1], [p2], [p3], [p4], [p5], [p6], [p7], [p8], [p9], [p10], [p11]]}, projection=[a, b, c, d, e], output_partitioning=Hash([b@1], 12), file_type=parquet + " + ); + Ok(()) +} + +#[test] +fn inner_hash_join_rehashes_to_target_count() -> Result<()> { + let join = partitioned_join_plan( + hash_partitioning("a", 3)?, + hash_partitioning("b", 3)?, + JoinType::Inner, + ); + + let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT); + + assert_plan!( + plan, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=3 + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Hash([a@0], 3), file_type=parquet + RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=3 + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Hash([b@1], 3), file_type=parquet + " + ); + Ok(()) +} + +#[test] +fn non_inner_range_join_rehashes() -> Result<()> { + let join = partitioned_join_plan( + range_partitioning("a", [10, 20], SortOptions::default())?, + range_partitioning("b", [10, 20], SortOptions::default())?, + JoinType::Left, + ); + + let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT); + + assert_plan!( + plan, + @r" + HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, b@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=3 + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)], 3), file_type=parquet + RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=3 + DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([b@1 ASC], [(10), (20)], 3), file_type=parquet + " + ); + Ok(()) +} + #[test] fn multi_hash_joins() -> Result<()> { let left = parquet_exec(); diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 217570846d56e..f12e5d5f764b0 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -19,9 +19,9 @@ use insta::assert_snapshot; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, - projection_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, - sort_merge_join_exec, sort_preserving_merge_exec, union_exec, + bounded_window_exec, global_limit_exec, hash_join_exec, local_limit_exec, + memory_exec, projection_exec, repartition_exec, sort_exec, sort_expr, + sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, union_exec, }; use arrow::compute::SortOptions; @@ -30,8 +30,8 @@ use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTab use datafusion::prelude::{CsvReadOptions, SessionContext}; use datafusion_common::config::ConfigOptions; use datafusion_common::{JoinType, Result, ScalarValue}; -use datafusion_physical_expr::Partitioning; use datafusion_physical_expr::expressions::{Literal, col}; +use datafusion_physical_expr::{Partitioning, RangePartitioning, SplitPoint}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; @@ -400,6 +400,52 @@ fn assert_sanity_check(plan: &Arc, is_sane: bool) { ); } +fn range_partitioned_exec( + schema: &SchemaRef, + key: &str, + split_points: impl IntoIterator, +) -> Result> { + let split_points = split_points + .into_iter() + .map(|value| SplitPoint::new(vec![ScalarValue::Int32(Some(value))])) + .collect(); + let partitioning = Partitioning::Range(RangePartitioning::try_new( + [sort_expr(key, schema)].into(), + split_points, + )?); + let input = memory_exec(schema); + + RepartitionExec::try_new(input, partitioning) + .map(|exec| Arc::new(exec) as Arc) +} + +#[test] +fn test_partitioned_hash_join_requires_co_partitioned_children() -> Result<()> { + let schema = create_test_schema2(); + let join_on = vec![(col("a", &schema)?, col("b", &schema)?)]; + let right = range_partitioned_exec(&schema, "b", [10])?; + + let valid_join = hash_join_exec( + range_partitioned_exec(&schema, "a", [10])?, + Arc::clone(&right), + join_on.clone(), + None, + &JoinType::Inner, + )?; + assert_sanity_check(&valid_join, true); + + let invalid_join = hash_join_exec( + range_partitioned_exec(&schema, "a", [20])?, + right, + join_on, + None, + &JoinType::Inner, + )?; + assert_sanity_check(&invalid_join, false); + + Ok(()) +} + #[tokio::test] /// Tests that plan is valid when the sort requirements are satisfied. async fn test_bounded_window_agg_sort_requirement() -> Result<()> { diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 2e0aaaf3fb4b7..8ac5de2a87b9d 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -244,17 +244,13 @@ impl RangePartitioning { self.split_points.len() + 1 } - /// Returns true when `self` and `other` describe the same range partition - /// map. + /// Returns true when `self` and `other` have the same range boundaries. /// - /// Single-partition range partitionings are always compatible. Otherwise, - /// the two partitionings must have identical split points and equivalent - /// ordering expressions with the same sort options. - pub fn compatible_with( - &self, - other: &Self, - eq_properties: &EquivalenceProperties, - ) -> bool { + /// Single-partition range partitionings always have the same boundaries. Otherwise, + /// the two partitionings must have identical split points, ordering width, + /// and sort options. This does not compare ordering expressions, callers + /// should validate the range keys separately. + fn same_boundaries(&self, other: &Self) -> bool { if self.partition_count() == 1 && other.partition_count() == 1 { return true; } @@ -274,18 +270,7 @@ impl RangePartitioning { return false; } - let left_exprs = self - .ordering - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - let right_exprs = other - .ordering - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - - equivalent_exprs(&left_exprs, &right_exprs, eq_properties) + true } /// Calculates the range partitioning after applying the given projection. @@ -406,37 +391,56 @@ impl Partitioning { } } - /// Returns true when `self` and `other` describe compatible partition maps. + /// Returns true when two partitionings both satisfy their own distribution + /// requirements and can be paired by partition index. + /// + /// Use this for multi-input operators, such as partitioned joins, where + /// each child has a different schema, required [`Distribution`], and + /// expression-equivalence context. + /// + /// ```text + /// # co-partitioned: each side satisfies its own requirement, and boundaries match + /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a) + /// right: Range(right.x ASC, [10, 20]), required KeyPartitioned(right.x) + /// + /// # not compatible: right side does not satisfy a hash-specific requirement + /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a) + /// right: Range(right.x ASC, [10, 20]), required HashPartitioned(right.x) /// - /// Compatible partition maps can be used for partition-local behavior: if - /// this returns true, partition `i` from both partitionings can be treated - /// as covering the same partition domain. This is stricter than - /// [`Self::satisfaction`], which only answers whether this partitioning can - /// satisfy a required distribution. - pub fn compatible_with( + /// # not compatible: boundaries differ + /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a) + /// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x) + /// ``` + pub fn co_partitioned_with( &self, - other: &Self, + required: &Distribution, eq_properties: &EquivalenceProperties, + other: &Self, + other_required: &Distribution, + other_eq_properties: &EquivalenceProperties, ) -> bool { + if !self + .satisfaction(required, eq_properties, false) + .is_satisfied() + || !other + .satisfaction(other_required, other_eq_properties, false) + .is_satisfied() + { + return false; + } + if self.partition_count() == 1 && other.partition_count() == 1 { return true; } + if self.partition_count() != other.partition_count() { + return false; + } + match (self, other) { - ( - Partitioning::Hash(left_exprs, left_count), - Partitioning::Hash(right_exprs, right_count), - ) => { - if left_count != right_count { - return false; - } - if left_exprs.is_empty() || right_exprs.is_empty() { - return false; - } - equivalent_exprs(left_exprs, right_exprs, eq_properties) - } + (Partitioning::Hash(_, _), Partitioning::Hash(_, _)) => true, (Partitioning::Range(left), Partitioning::Range(right)) => { - left.compatible_with(right, eq_properties) + left.same_boundaries(right) } _ => false, } @@ -461,6 +465,47 @@ impl Partitioning { }) } + fn key_expr_satisfaction( + partition_exprs: &[Arc], + required_exprs: &[Arc], + eq_properties: &EquivalenceProperties, + allow_subset: bool, + ) -> PartitioningSatisfaction { + if partition_exprs.is_empty() || required_exprs.is_empty() { + return PartitioningSatisfaction::NotSatisfied; + } + + if equivalent_exprs(required_exprs, partition_exprs, eq_properties) { + return PartitioningSatisfaction::Exact; + } + + if !allow_subset { + return PartitioningSatisfaction::NotSatisfied; + } + + let eq_groups = eq_properties.eq_group(); + if eq_groups.is_empty() { + if Self::is_subset_partitioning(partition_exprs, required_exprs) { + PartitioningSatisfaction::Subset + } else { + PartitioningSatisfaction::NotSatisfied + } + } else { + let normalized_partition_exprs = + normalize_exprs(partition_exprs, eq_properties); + let normalized_required_exprs = + normalize_exprs(required_exprs, eq_properties); + if Self::is_subset_partitioning( + &normalized_partition_exprs, + &normalized_required_exprs, + ) { + PartitioningSatisfaction::Subset + } else { + PartitioningSatisfaction::NotSatisfied + } + } + } + #[deprecated(since = "52.0.0", note = "Use satisfaction instead")] pub fn satisfy( &self, @@ -484,51 +529,52 @@ impl Partitioning { Distribution::SinglePartition if self.partition_count() == 1 => { PartitioningSatisfaction::Exact } - // When partition count is 1, hash requirement is satisfied. - Distribution::HashPartitioned(_) if self.partition_count() == 1 => { + // When partition count is 1, partitioned requirements are satisfied. + Distribution::HashPartitioned(_) | Distribution::KeyPartitioned(_) + if self.partition_count() == 1 => + { PartitioningSatisfaction::Exact } + Distribution::KeyPartitioned(required_exprs) => match self { + Partitioning::Hash(partition_exprs, _) => Self::key_expr_satisfaction( + partition_exprs, + required_exprs, + eq_properties, + allow_subset, + ), + Partitioning::Range(range) => { + let partition_exprs = range + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Self::key_expr_satisfaction( + &partition_exprs, + required_exprs, + eq_properties, + allow_subset, + ) + } + Partitioning::RoundRobinBatch(_) + | Partitioning::UnknownPartitioning(_) => { + PartitioningSatisfaction::NotSatisfied + } + }, Distribution::HashPartitioned(required_exprs) => match self { // Here we do not check the partition count for hash partitioning and assumes the partition count // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, // then we need to have the partition count and hash functions validation. - Partitioning::Hash(partition_exprs, _) => { - // Empty hash partitioning is invalid - if partition_exprs.is_empty() || required_exprs.is_empty() { - return PartitioningSatisfaction::NotSatisfied; - } - - if equivalent_exprs(required_exprs, partition_exprs, eq_properties) { - return PartitioningSatisfaction::Exact; - } - - let eq_groups = eq_properties.eq_group(); - if !eq_groups.is_empty() { - if allow_subset { - let normalized_partition_exprs = - normalize_exprs(partition_exprs, eq_properties); - let normalized_required_exprs = - normalize_exprs(required_exprs, eq_properties); - if Self::is_subset_partitioning( - &normalized_partition_exprs, - &normalized_required_exprs, - ) { - return PartitioningSatisfaction::Subset; - } - } - } else if allow_subset - && Self::is_subset_partitioning(partition_exprs, required_exprs) - { - return PartitioningSatisfaction::Subset; - } - - PartitioningSatisfaction::NotSatisfied - } + Partitioning::Hash(partition_exprs, _) => Self::key_expr_satisfaction( + partition_exprs, + required_exprs, + eq_properties, + allow_subset, + ), Partitioning::RoundRobinBatch(_) - | Partitioning::Range(_) | Partitioning::UnknownPartitioning(_) => { PartitioningSatisfaction::NotSatisfied } + Partitioning::Range(_) => PartitioningSatisfaction::NotSatisfied, }, Distribution::SinglePartition => PartitioningSatisfaction::NotSatisfied, } @@ -596,9 +642,31 @@ pub enum Distribution { /// Requires children to be distributed in such a way that the same /// values of the keys end up in the same partition HashPartitioned(Vec>), + /// Requires rows with equal values for the given keys to be colocated in + /// the same partition, without requiring a specific partitioning algorithm. + /// + /// Unlike [`Self::HashPartitioned`], this can be satisfied by non-hash + /// partitioning such as range partitioning. A partitioning on a subset of + /// these keys can also satisfy this requirement because rows equal on all + /// required keys are also equal on any subset. + /// + /// For multi-input operators, satisfaction alone is not enough: each input + /// may satisfy its own key requirement while using incompatible partition + /// boundaries. Use [`Partitioning::co_partitioned_with`] before pairing + /// partitions by index. + KeyPartitioned(Vec>), } impl Distribution { + /// Returns key expressions for distribution variants that require + /// co-locating equal key values. + pub fn key_exprs(&self) -> Option<&[Arc]> { + match self { + Self::HashPartitioned(exprs) | Self::KeyPartitioned(exprs) => Some(exprs), + Self::UnspecifiedDistribution | Self::SinglePartition => None, + } + } + /// Creates a `Partitioning` that satisfies this `Distribution` pub fn create_partitioning(self, partition_count: usize) -> Partitioning { match self { @@ -606,7 +674,7 @@ impl Distribution { Partitioning::UnknownPartitioning(partition_count) } Distribution::SinglePartition => Partitioning::UnknownPartitioning(1), - Distribution::HashPartitioned(expr) => { + Distribution::HashPartitioned(expr) | Distribution::KeyPartitioned(expr) => { Partitioning::Hash(expr, partition_count) } } @@ -621,6 +689,9 @@ impl Display for Distribution { Distribution::HashPartitioned(exprs) => { write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs)) } + Distribution::KeyPartitioned(exprs) => { + write!(f, "KeyPartitioned[{}])", format_physical_expr_list(exprs)) + } } } } @@ -696,6 +767,13 @@ mod tests { Distribution::HashPartitioned(self.cols(indices)) } + fn key_partitioned_distribution( + &self, + indices: impl IntoIterator, + ) -> Distribution { + Distribution::KeyPartitioned(self.cols(indices)) + } + fn range_sort_expr( &self, index: usize, @@ -756,6 +834,7 @@ mod tests { Distribution::UnspecifiedDistribution, Distribution::SinglePartition, fixture.hash_distribution([0, 1]), + Distribution::KeyPartitioned(fixture.cols([0, 1])), ]; let single_partition = Partitioning::UnknownPartitioning(1); @@ -793,6 +872,9 @@ mod tests { Distribution::HashPartitioned(_) => { assert_eq!(result, (true, false, false, true, false)) } + Distribution::KeyPartitioned(_) => { + assert_eq!(result, (true, false, false, true, false)) + } } } @@ -1113,6 +1195,101 @@ mod tests { Ok(()) } + #[test] + fn key_partitioned_satisfaction_is_exact() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let hash_a = fixture.hash_partitioning([0], 2); + let hash_ab = fixture.hash_partitioning([0, 1], 2); + let range_a = fixture + .range_partitioning([0], vec![int_split_point([10]), int_split_point([20])]); + let range_ab = + fixture.range_partitioning([0, 1], vec![int_split_point([10, 20])]); + + let exact_cases = [ + ( + "Hash([a]) vs KeyPartitioned([a])", + &hash_a, + fixture.key_partitioned_distribution([0]), + ), + ( + "Range([a]) vs KeyPartitioned([a])", + &range_a, + fixture.key_partitioned_distribution([0]), + ), + ]; + for (desc, partitioning, requirement) in exact_cases { + for allow_subset in [true, false] { + assert_eq!( + partitioning.satisfaction( + &requirement, + &fixture.eq_properties, + allow_subset, + ), + PartitioningSatisfaction::Exact, + "Failed for {desc} with allow_subset={allow_subset}" + ); + } + } + + let subset_cases = [ + ( + "Hash([a]) vs KeyPartitioned([a, b])", + &hash_a, + fixture.key_partitioned_distribution([0, 1]), + ), + ( + "Range([a]) vs KeyPartitioned([a, b])", + &range_a, + fixture.key_partitioned_distribution([0, 1]), + ), + ]; + for (desc, partitioning, requirement) in subset_cases { + assert_eq!( + partitioning.satisfaction(&requirement, &fixture.eq_properties, true), + PartitioningSatisfaction::Subset, + "Failed for {desc} with subset enabled" + ); + assert_eq!( + partitioning.satisfaction(&requirement, &fixture.eq_properties, false), + PartitioningSatisfaction::NotSatisfied, + "Failed for {desc} with subset disabled" + ); + } + + let not_satisfied_cases = [ + ( + "Range([a]) vs KeyPartitioned([b])", + &range_a, + fixture.key_partitioned_distribution([1]), + ), + ( + "Hash([a, b]) vs KeyPartitioned([a])", + &hash_ab, + fixture.key_partitioned_distribution([0]), + ), + ( + "Range([a, b]) vs KeyPartitioned([a])", + &range_ab, + fixture.key_partitioned_distribution([0]), + ), + ]; + for (desc, partitioning, requirement) in not_satisfied_cases { + for allow_subset in [true, false] { + assert_eq!( + partitioning.satisfaction( + &requirement, + &fixture.eq_properties, + allow_subset, + ), + PartitioningSatisfaction::NotSatisfied, + "Failed for {desc} with allow_subset={allow_subset}" + ); + } + } + + Ok(()) + } + fn int_split_point(values: impl IntoIterator) -> SplitPoint { SplitPoint::new( values @@ -1255,154 +1432,140 @@ mod tests { } #[test] - fn test_range_partitioning_compatible_with() -> Result<()> { + fn range_partitionings_are_co_partitioned_by_boundaries() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b"])?; - let mut eq_properties = fixture.eq_properties.clone(); - eq_properties.add_equal_conditions(fixture.col(0), fixture.col(1))?; - - let split_points = vec![int_split_point([10]), int_split_point([20])]; - let range_a = fixture.range([0], split_points.clone()); - let range_a_same = fixture.range([0], split_points.clone()); - let range_b_equivalent = fixture.range([1], split_points.clone()); - let range_b_different_split = fixture.range([1], vec![int_split_point([30])]); - let range_a_desc = RangePartitioning::try_new( - [fixture.range_sort_expr(0, SortOptions::new(true, false))].into(), - vec![int_split_point([10])], - )?; - let single_partition_range_a = fixture.range([0], vec![]); - let single_partition_range_b = fixture.range([1], vec![]); - - assert!(range_a.compatible_with(&range_a_same, &fixture.eq_properties)); - assert!(range_a.compatible_with(&range_b_equivalent, &eq_properties)); - assert!(!range_a.compatible_with(&range_b_equivalent, &fixture.eq_properties)); - assert!(!range_a.compatible_with(&range_b_different_split, &eq_properties)); - assert!(!range_a.compatible_with(&range_a_desc, &eq_properties)); - assert!( - single_partition_range_a - .compatible_with(&single_partition_range_b, &fixture.eq_properties) + let left = fixture + .range_partitioning([0], vec![int_split_point([10]), int_split_point([20])]); + let right_same_map = fixture + .range_partitioning([1], vec![int_split_point([10]), int_split_point([20])]); + let right_different_split = fixture + .range_partitioning([1], vec![int_split_point([15]), int_split_point([20])]); + let right_desc = fixture.range_partitioning_with_ordering( + [fixture.range_sort_expr(1, SortOptions::new(true, false))].into(), + vec![int_split_point([20]), int_split_point([10])], ); - assert!( - fixture - .range_partitioning([0], vec![int_split_point([10])]) - .compatible_with( - &fixture.range_partitioning([1], vec![int_split_point([10])]), - &eq_properties - ) - ); - assert!( - !fixture - .range_partitioning([0], vec![int_split_point([10])]) - .compatible_with( - &fixture.range_partitioning([0], vec![int_split_point([20])]), - &fixture.eq_properties - ) - ); - assert!( - !fixture - .range_partitioning([0], vec![int_split_point([10])]) - .compatible_with( - &fixture.hash_partitioning([0], 2), - &fixture.eq_properties - ) - ); + let test_cases = [ + ( + "same boundaries with matching key requirements", + fixture.key_partitioned_distribution([0]), + right_same_map.clone(), + fixture.key_partitioned_distribution([1]), + true, + ), + ( + "different split points", + fixture.key_partitioned_distribution([0]), + right_different_split, + fixture.key_partitioned_distribution([1]), + false, + ), + ( + "different sort options", + fixture.key_partitioned_distribution([0]), + right_desc, + fixture.key_partitioned_distribution([1]), + false, + ), + ( + "range cannot satisfy hash requirement", + fixture.hash_distribution([0]), + right_same_map, + fixture.key_partitioned_distribution([1]), + false, + ), + ]; + for (desc, left_requirement, right, right_requirement, expected) in test_cases { + assert_eq!( + left.co_partitioned_with( + &left_requirement, + &fixture.eq_properties, + &right, + &right_requirement, + &fixture.eq_properties, + ), + expected, + "Failed for {desc}" + ); + } Ok(()) } #[test] - fn test_hash_partitioning_compatible_with() -> Result<()> { + fn co_partitioned_with_rejects_subset_key_satisfaction() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b"])?; - let mut eq_properties = fixture.eq_properties.clone(); - eq_properties.add_equal_conditions(fixture.col(0), fixture.col(1))?; + let left = fixture + .range_partitioning([0], vec![int_split_point([10]), int_split_point([20])]); + let right = fixture.range_partitioning([0, 1], vec![int_split_point([10, 100])]); - assert!( - fixture.hash_partitioning([0], 2).compatible_with( - &fixture.hash_partitioning([0], 2), - &fixture.eq_properties - ) - ); - assert!( - fixture - .hash_partitioning([0], 2) - .compatible_with(&fixture.hash_partitioning([1], 2), &eq_properties) - ); - assert!( - !fixture.hash_partitioning([0], 2).compatible_with( - &fixture.hash_partitioning([1], 2), - &fixture.eq_properties - ) - ); - assert!( - !fixture.hash_partitioning([0], 2).compatible_with( - &fixture.hash_partitioning([0], 3), - &fixture.eq_properties - ) + assert_eq!( + right.satisfaction( + &fixture.key_partitioned_distribution([0]), + &fixture.eq_properties, + false, + ), + PartitioningSatisfaction::NotSatisfied ); - assert!(!fixture.hash_partitioning([0], 2).compatible_with( - &fixture.hash_partitioning([0, 1], 2), - &fixture.eq_properties - )); - assert!( - !Partitioning::Hash(vec![], 2) - .compatible_with(&Partitioning::Hash(vec![], 2), &fixture.eq_properties) + assert_eq!( + left.satisfaction( + &fixture.key_partitioned_distribution([0, 1]), + &fixture.eq_properties, + true, + ), + PartitioningSatisfaction::Subset ); - assert!(!fixture.hash_partitioning([0], 2).compatible_with( - &fixture.range_partitioning([0], vec![int_split_point([10])]), - &fixture.eq_properties + assert!(!left.co_partitioned_with( + &fixture.key_partitioned_distribution([0, 1]), + &fixture.eq_properties, + &right, + &fixture.key_partitioned_distribution([0]), + &fixture.eq_properties, )); - assert!( - fixture.hash_partitioning([0], 1).compatible_with( - &Partitioning::RoundRobinBatch(1), - &fixture.eq_properties - ) - ); Ok(()) } #[test] - fn test_round_robin_partitioning_compatible_with() { - let eq_properties = EquivalenceProperties::new(Arc::new(Schema::empty())); - - assert!( - Partitioning::RoundRobinBatch(1) - .compatible_with(&Partitioning::RoundRobinBatch(1), &eq_properties) - ); - assert!( - !Partitioning::RoundRobinBatch(2) - .compatible_with(&Partitioning::RoundRobinBatch(2), &eq_properties) - ); - assert!( - Partitioning::RoundRobinBatch(1) - .compatible_with(&Partitioning::UnknownPartitioning(1), &eq_properties) - ); - assert!( - !Partitioning::RoundRobinBatch(2) - .compatible_with(&Partitioning::UnknownPartitioning(2), &eq_properties) - ); - } + fn hash_partitionings_are_co_partitioned_by_count() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let left = fixture.hash_partitioning([0], 2); - #[test] - fn test_unknown_partitioning_compatible_with() { - let eq_properties = EquivalenceProperties::new(Arc::new(Schema::empty())); + let test_cases = [ + ( + "same partition count", + fixture.hash_partitioning([1], 2), + fixture.key_partitioned_distribution([1]), + true, + ), + ( + "different partition count", + fixture.hash_partitioning([1], 3), + fixture.key_partitioned_distribution([1]), + false, + ), + ( + "mixed hash and range partitioning", + fixture.range_partitioning([1], vec![int_split_point([10])]), + fixture.key_partitioned_distribution([1]), + false, + ), + ]; + for (desc, right, right_requirement, expected) in test_cases { + assert_eq!( + left.co_partitioned_with( + &fixture.key_partitioned_distribution([0]), + &fixture.eq_properties, + &right, + &right_requirement, + &fixture.eq_properties, + ), + expected, + "Failed for {desc}" + ); + } - assert!( - Partitioning::UnknownPartitioning(1) - .compatible_with(&Partitioning::UnknownPartitioning(1), &eq_properties) - ); - assert!( - !Partitioning::UnknownPartitioning(2) - .compatible_with(&Partitioning::UnknownPartitioning(2), &eq_properties) - ); - assert!( - Partitioning::UnknownPartitioning(1) - .compatible_with(&Partitioning::RoundRobinBatch(1), &eq_properties) - ); - assert!( - !Partitioning::UnknownPartitioning(2) - .compatible_with(&Partitioning::RoundRobinBatch(2), &eq_properties) - ); + Ok(()) } #[test] diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 76cb59a305a5f..3abff3a0eec81 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -713,6 +713,8 @@ fn add_roundrobin_on_top( /// current executor is less than this value. Partition number will be increased. /// * `allow_subset_satisfy_partitioning`: Whether to allow subset partitioning logic in satisfaction checks. /// Set to `false` for partitioned hash joins to ensure exact hash matching. +/// * `force_to_target`: Whether to repartition even when the hash expressions +/// are already satisfied but the partition count differs from `n_target`. /// /// # Returns /// @@ -723,6 +725,7 @@ fn add_hash_on_top( hash_exprs: Vec>, n_target: usize, allow_subset_satisfy_partitioning: bool, + force_to_target: bool, ) -> Result { // Early return if hash repartition is unnecessary // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary. @@ -731,6 +734,7 @@ fn add_hash_on_top( } let dist = Distribution::HashPartitioned(hash_exprs); + let current_partitions = input.plan.output_partitioning().partition_count(); let satisfaction = input.plan.output_partitioning().satisfaction( &dist, input.plan.equivalence_properties(), @@ -740,11 +744,12 @@ fn add_hash_on_top( // Add hash repartitioning when: // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism - let needs_repartition = if allow_subset_satisfy_partitioning { + let needs_repartition = if force_to_target { + !satisfaction.is_satisfied() || n_target != current_partitions + } else if allow_subset_satisfy_partitioning { !satisfaction.is_satisfied() } else { - !satisfaction.is_satisfied() - || n_target > input.plan.output_partitioning().partition_count() + !satisfaction.is_satisfied() || n_target > current_partitions }; if needs_repartition { @@ -968,6 +973,32 @@ struct RepartitionRequirementStatus { roundrobin_beneficial_stats: bool, /// Designates whether hash partitioning is necessary. hash_necessary: bool, + /// Designates whether hash repartitioning should force the target + /// partition count even when the hash expressions are already satisfied. + force_hash_to_target: bool, +} + +#[derive(Debug, Clone, Copy, Default)] +struct PartitionedJoinDistribution { + /// Inner partitioned hash join children can be paired by existing Range + /// partitions, so hash repartitioning is not needed. + compatible_range: bool, + /// Partitioned join children have different partition counts. If hash + /// repartitioning is used, both sides must be forced to the target count. + needs_count_alignment: bool, +} + +fn requirement_includes_grouping_id(requirement: &Distribution) -> bool { + // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash + // partitioning on all group columns including __grouping_id to ensure + // partial aggregates from different partitions are correctly combined. + requirement.key_exprs().is_some_and(|exprs| { + exprs.iter().any(|expr| { + (expr.as_ref() as &dyn Any) + .downcast_ref::() + .is_some_and(|col| col.name() == Aggregate::INTERNAL_GROUPING_ID) + }) + }) } /// Calculates the `RepartitionRequirementStatus` for each children to generate @@ -1010,6 +1041,7 @@ fn get_repartition_requirement_status( let children = plan.children(); let rr_beneficial = plan.benefits_from_input_partitioning(); let requirements = plan.required_input_distribution(); + let join_distribution = partitioned_join_distribution(plan); let mut repartition_status_flags = vec![]; for (child, requirement, roundrobin_beneficial) in izip!(children.into_iter(), requirements, rr_beneficial) @@ -1024,30 +1056,39 @@ fn get_repartition_requirement_status( Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size), Precision::Absent => true, }; - let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); - // Hash re-partitioning is necessary when the input has more than one - // partitions: + let is_partitioned_requirement = requirement.key_exprs().is_some(); + // Hash repartitioning may be necessary when the input has more than one + // partition, or when repartitioning one sibling requires aligning all + // key-partitioned siblings. let multi_partitions = child.output_partitioning().partition_count() > 1; let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats; - needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible); + needs_alignment |= is_partitioned_requirement + && !join_distribution.compatible_range + && (multi_partitions || roundrobin_sensible); repartition_status_flags.push(( - is_hash, + is_partitioned_requirement, RepartitionRequirementStatus { requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary: is_hash && multi_partitions, + hash_necessary: is_partitioned_requirement + && multi_partitions + && !join_distribution.compatible_range, + // Hash satisfaction checks key expressions, not matching + // partition counts, so force repartition when join sides differ. + force_hash_to_target: is_partitioned_requirement + && join_distribution.needs_count_alignment, }, )); } - // Align hash necessary flags for hash partitions to generate consistent + // Align hash necessary flags for key partitions to generate consistent // hash partitions at each children: if needs_alignment { - // When there is at least one hash requirement that is necessary or - // beneficial according to statistics, make all children require hash - // repartitioning: - for (is_hash, status) in &mut repartition_status_flags { - if *is_hash { + // When there is at least one key-partitioned requirement that is necessary + // or beneficial according to statistics, make all key-partitioned + // children require hash repartitioning: + for (is_partitioned_requirement, status) in &mut repartition_status_flags { + if *is_partitioned_requirement { status.hash_necessary = true; } } @@ -1058,6 +1099,59 @@ fn get_repartition_requirement_status( .collect()) } +/// Returns distribution state for a partitioned join's children. +/// +/// This is optimizer policy: partitioned joins require children that can be +/// paired by partition index. Inner hash joins can reuse compatible range +/// partitioning; otherwise the existing hash repartitioning policy applies. +fn partitioned_join_distribution( + plan: &Arc, +) -> PartitionedJoinDistribution { + let Some(hash_join) = plan.downcast_ref::() else { + return Default::default(); + }; + + if hash_join.mode != PartitionMode::Partitioned { + return Default::default(); + } + + let children = plan.children(); + let [left, right] = children.as_slice() else { + return Default::default(); + }; + let needs_count_alignment = left.output_partitioning().partition_count() + != right.output_partitioning().partition_count(); + + let requirements = plan.required_input_distribution(); + let left_partitioning = left.output_partitioning(); + let right_partitioning = right.output_partitioning(); + let compatible_range = match requirements.as_slice() { + [ + left_requirement @ Distribution::KeyPartitioned(_), + right_requirement @ Distribution::KeyPartitioned(_), + ] if hash_join.join_type == JoinType::Inner + && matches!( + (left_partitioning, right_partitioning), + (Partitioning::Range(_), Partitioning::Range(_)) + ) => + { + left_partitioning.co_partitioned_with( + left_requirement, + left.equivalence_properties(), + right_partitioning, + right_requirement, + right.equivalence_properties(), + ) + } + _ => false, + }; + + PartitionedJoinDistribution { + compatible_range, + needs_count_alignment, + } +} + /// This function checks whether we need to add additional data exchange /// operators to satisfy distribution requirements. Since this function /// takes care of such requirements, we should avoid manually adding data @@ -1184,6 +1278,7 @@ pub fn ensure_distribution( roundrobin_beneficial, roundrobin_beneficial_stats, hash_necessary, + force_hash_to_target, }, )| { let increases_partition_count = @@ -1202,18 +1297,6 @@ pub fn ensure_distribution( // 3. Not a grouping set aggregate (requires exact hash including __grouping_id) let current_partitions = child.plan.output_partitioning().partition_count(); - // Check if the hash partitioning requirement includes __grouping_id column. - // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash - // partitioning on all group columns including __grouping_id to ensure partial - // aggregates from different partitions are correctly combined. - let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs) - if exprs.iter().any(|expr| { - (expr.as_ref() as &dyn Any) - .downcast_ref::() - .is_some_and(|col| col.name() == Aggregate::INTERNAL_GROUPING_ID) - }) - ); - let allow_subset_satisfy_partitioning = (current_partitions >= subset_satisfaction_threshold // `preserve_file_partitions` exposes existing file-group @@ -1223,7 +1306,7 @@ pub fn ensure_distribution( || (config.optimizer.preserve_file_partitions > 0 && current_partitions < target_partitions)) && !is_partitioned_join - && !requires_grouping_id; + && !requirement_includes_grouping_id(&requirement); // When `repartition_file_scans` is set, attempt to increase // parallelism at the source. @@ -1244,7 +1327,8 @@ pub fn ensure_distribution( Distribution::SinglePartition => { child = add_merge_on_top(child, removed_fetch); } - Distribution::HashPartitioned(exprs) => { + Distribution::HashPartitioned(exprs) + | Distribution::KeyPartitioned(exprs) => { // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. if hash_necessary { @@ -1253,6 +1337,7 @@ pub fn ensure_distribution( exprs.to_vec(), target_partitions, allow_subset_satisfy_partitioning, + force_hash_to_target, )?; } } @@ -1311,7 +1396,9 @@ pub fn ensure_distribution( // no ordering requirement match requirement { // Operator requires specific distribution. - Distribution::SinglePartition | Distribution::HashPartitioned(_) => { + Distribution::SinglePartition + | Distribution::HashPartitioned(_) + | Distribution::KeyPartitioned(_) => { // If the parent doesn't maintain input order, preserving // ordering is pointless. However, if it does maintain // input order, we keep order-preserving variants so diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs index 261cf701c870f..ca6b32c24ca3e 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs @@ -120,6 +120,8 @@ fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution { } (Distribution::HashPartitioned(_), _) => a.clone(), (_, Distribution::HashPartitioned(_)) => b.clone(), + (Distribution::KeyPartitioned(_), _) => a.clone(), + (_, Distribution::KeyPartitioned(_)) => b.clone(), _ => Distribution::UnspecifiedDistribution, } } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index fb91ae46a2a08..fa3d5796fe6ab 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -271,8 +271,9 @@ impl ExecutionPlan for OutputRequirementExec { requirements = OrderingRequirements::new_alternatives(updated_reqs, soft); } - let dist_req = match &self.required_input_distribution()[0] { - Distribution::HashPartitioned(exprs) => { + let dist_req = { + let dist_req = &self.required_input_distribution()[0]; + if let Some(exprs) = dist_req.key_exprs() { let mut updated_exprs = vec![]; for expr in exprs { let Some(new_expr) = update_expr(expr, projection.expr(), false)? @@ -281,9 +282,18 @@ impl ExecutionPlan for OutputRequirementExec { }; updated_exprs.push(new_expr); } - Distribution::HashPartitioned(updated_exprs) + match dist_req { + Distribution::HashPartitioned(_) => { + Distribution::HashPartitioned(updated_exprs) + } + Distribution::KeyPartitioned(_) => { + Distribution::KeyPartitioned(updated_exprs) + } + _ => unreachable!(), + } + } else { + dist_req.clone() } - dist => dist.clone(), }; make_with_child(projection, &self.input()).map(|input| { diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 40c6245d894d4..96900490e0921 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -31,7 +31,9 @@ use datafusion_common::plan_err; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; -use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::joins::{ + HashJoinExec, PartitionMode, SymmetricHashJoinExec, +}; use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string}; use crate::PhysicalOptimizerRule; @@ -178,6 +180,50 @@ pub fn check_plan_sanity( } } + check_partitioned_join_distribution(plan)?; + + Ok(()) +} + +fn check_partitioned_join_distribution(plan: &Arc) -> Result<()> { + let Some(hash_join) = plan.downcast_ref::() else { + return Ok(()); + }; + + if hash_join.mode != PartitionMode::Partitioned { + return Ok(()); + } + + let children = plan.children(); + let requirements = plan.required_input_distribution(); + let ([left, right], [left_req, right_req]) = + (children.as_slice(), requirements.as_slice()) + else { + return plan_err!( + "Invalid HashJoinExec: expected two children and two distribution requirements" + ); + }; + + if !left.output_partitioning().co_partitioned_with( + left_req, + left.equivalence_properties(), + right.output_partitioning(), + right_req, + right.equivalence_properties(), + ) { + let plan_str = get_plan_string(plan); + return plan_err!( + "Plan: {:?} does not satisfy partitioned join co-partitioning requirements: \ + left requirement: {}, left output partitioning: {}; \ + right requirement: {}, right output partitioning: {}", + plan_str, + left_req, + left.output_partitioning(), + right_req, + right.output_partitioning() + ); + } + Ok(()) } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 76abf73e0ebbe..557bebe1d88db 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -164,8 +164,13 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { check_default_invariants(self, check) } - /// Specifies the data distribution requirements for all the - /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child, + /// Specifies the data distribution requirements for all the children for + /// this `ExecutionPlan`. + /// + /// By default, each child has [`Distribution::UnspecifiedDistribution`]. + /// Multi-input operators that use [`Distribution::KeyPartitioned`] must + /// use [`Partitioning::co_partitioned_with`] to verify that satisfied + /// children can be paired by partition index. fn required_input_distribution(&self) -> Vec { vec![Distribution::UnspecifiedDistribution; self.children().len()] } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b1d387ea74557..f636416987ceb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -24,8 +24,8 @@ use std::vec; use crate::ExecutionPlanProperties; use crate::execution_plan::{ - EmissionType, boundedness_from_children, has_same_children_properties, - stub_properties, + EmissionType, InvariantLevel, boundedness_from_children, check_default_invariants, + has_same_children_properties, stub_properties, }; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, @@ -870,21 +870,63 @@ impl HashJoinExec { return false; } - // `preserve_file_partitions` can report Hash partitioning for Hive-style - // file groups, but those partitions are not actually hash-distributed. - // Partitioned dynamic filters rely on hash routing, so disable them in - // this mode to avoid incorrect results. Follow-up work: enable dynamic - // filtering for preserve_file_partitioned scans (issue #20195). - // https://github.com/apache/datafusion/issues/20195 - if config.optimizer.preserve_file_partitions > 0 - && self.mode == PartitionMode::Partitioned - { - return false; + if self.mode == PartitionMode::Partitioned { + // `preserve_file_partitions` can report Hash partitioning for + // Hive-style file groups, but those partitions are not actually + // hash-distributed. Partitioned dynamic filters rely on hash + // routing, so disable them in this mode to avoid incorrect + // results. Follow-up work: enable dynamic filtering for + // preserve_file_partitioned scans (issue #20195). + // https://github.com/apache/datafusion/issues/20195 + if config.optimizer.preserve_file_partitions > 0 { + return false; + } + + // Partitioned dynamic filters route probe rows with + // `hash(join_key) % partition_count`. That is only valid when + // partition ids are hash buckets with the same bucket count, or + // when there is only one partition and no routing choice exists. + // This also rejects non-hash partitioning such as + // `Partitioning::Range`. + if !self.has_partitioned_dynamic_filter_routing() { + return false; + } } true } + fn has_partitioned_dynamic_filter_routing(&self) -> bool { + match ( + self.left.output_partitioning(), + self.right.output_partitioning(), + ) { + ( + Partitioning::Hash(_, left_partition_count), + Partitioning::Hash(_, right_partition_count), + ) => left_partition_count == right_partition_count, + (left_partitioning, right_partitioning) => { + left_partitioning.partition_count() == 1 + && right_partitioning.partition_count() == 1 + } + } + } + + fn partitioned_children_co_partitioned(&self) -> bool { + let requirements = self.required_input_distribution(); + let [left_requirement, right_requirement] = requirements.as_slice() else { + return false; + }; + + self.left.output_partitioning().co_partitioned_with( + left_requirement, + self.left.equivalence_properties(), + self.right.output_partitioning(), + right_requirement, + self.right.equivalence_properties(), + ) + } + /// left (build) side which gets hashed pub fn left(&self) -> &Arc { &self.left @@ -1230,6 +1272,21 @@ impl ExecutionPlan for HashJoinExec { "HashJoinExec" } + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check)?; + + if matches!(check, InvariantLevel::Executable) + && self.mode == PartitionMode::Partitioned + && !self.partitioned_children_co_partitioned() + { + return plan_err!( + "Invalid HashJoinExec, partitioned children are not co-partitioned, consider using RepartitionExec" + ); + } + + Ok(()) + } + fn properties(&self) -> &Arc { &self.cache } @@ -1246,10 +1303,17 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); - vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), - ] + if self.join_type == JoinType::Inner { + vec![ + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), + ] + } else { + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } } PartitionMode::Auto => vec![ Distribution::UnspecifiedDistribution, @@ -1318,6 +1382,12 @@ impl ExecutionPlan for HashJoinExec { consider using RepartitionExec" ); + assert_or_internal_err!( + self.mode != PartitionMode::Partitioned + || self.partitioned_children_co_partitioned(), + "Invalid HashJoinExec, partitioned children are not co-partitioned, consider using RepartitionExec" + ); + assert_or_internal_err!( self.mode != PartitionMode::CollectLeft || left_partitions == 1, "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\ @@ -2206,6 +2276,7 @@ mod tests { use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; + use datafusion_physical_expr::{PhysicalSortExpr, RangePartitioning, SplitPoint}; use hashbrown::HashTable; use insta::{allow_duplicates, assert_snapshot}; use rstest::*; @@ -2442,6 +2513,16 @@ mod tests { Ok((join, dynamic_filter)) } + fn repartition_by( + input: Arc, + column_name: &str, + partition_count: usize, + ) -> Result> { + let expr = Arc::new(Column::new_with_schema(column_name, &input.schema())?); + RepartitionExec::try_new(input, Partitioning::Hash(vec![expr], partition_count)) + .map(|exec| Arc::new(exec) as _) + } + async fn join_collect( left: Arc, right: Arc, @@ -5676,7 +5757,6 @@ mod tests { Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _, Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _, )]; - let join_types = vec![ JoinType::Inner, JoinType::Left, @@ -5689,6 +5769,11 @@ mod tests { ]; for join_type in join_types { + let left = + repartition_by(Arc::clone(&left) as Arc, "b1", 1)?; + let right = + repartition_by(Arc::clone(&right) as Arc, "b2", 1)?; + let runtime = RuntimeEnvBuilder::new() .with_memory_limit(100, 1.0) .build_arc()?; @@ -5710,19 +5795,19 @@ mod tests { false, )?; - let stream = join.execute(1, task_ctx)?; + let stream = join.execute(0, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); - // Asserting that stream-level reservation attempting to overallocate - assert_contains!( - err.to_string(), - "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]" - ); - + // Assert that the stream-level reservation attempts to + // overallocate. Other reservations can appear in the top-consumer + // report before HashJoinInput[0], so avoid depending on its exact + // ordering. assert_contains!( err.to_string(), - "Failed to allocate additional 120.0 B for HashJoinInput[1]" + "Resources exhausted: Additional allocation failed for HashJoinInput[0] with top memory consumers" ); + assert_contains!(err.to_string(), "HashJoinInput[0]"); + assert_contains!(err.to_string(), "Failed to allocate additional"); } Ok(()) @@ -6021,6 +6106,8 @@ mod tests { Arc::clone(&parent_left_schema), None, )?; + let child_left = repartition_by(child_left, "child_key", 4)?; + let child_right = repartition_by(child_right, "child_right_key", 4)?; let child_on = vec![( Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _, @@ -6037,6 +6124,7 @@ mod tests { PartitionMode::Partitioned, )?; let child_join: Arc = Arc::new(child_join); + let parent_left = repartition_by(parent_left, "parent_key", 4)?; let parent_on = vec![( Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _, @@ -6633,23 +6721,128 @@ mod tests { let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1])); let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1])); + let session_config = join_dynamic_filter_session_config(0); + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::RightSemi, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + false, + )?; + + assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options())); + + Ok(()) + } + + fn range_partitioned_test_input( + schema: SchemaRef, + range_key: &str, + ) -> Result> { + let input = TestMemoryExec::try_new_exec(&[vec![]], Arc::clone(&schema), None)?; + let range_expr = Arc::new(Column::new_with_schema(range_key, &schema)?); + let range_partitioning = Partitioning::Range(RangePartitioning::new( + [PhysicalSortExpr::new_default(range_expr)].into(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )); + RepartitionExec::try_new(input, range_partitioning) + .map(|exec| Arc::new(exec) as _) + } + + fn hash_partitioned_test_input( + schema: SchemaRef, + hash_key: &str, + partition_count: usize, + ) -> Result> { + let input = TestMemoryExec::try_new_exec(&[vec![]], Arc::clone(&schema), None)?; + let hash_expr = Arc::new(Column::new_with_schema(hash_key, &schema)?); + RepartitionExec::try_new( + input, + Partitioning::Hash(vec![hash_expr], partition_count), + ) + .map(|exec| Arc::new(exec) as _) + } + + fn join_dynamic_filter_session_config( + preserve_file_partitions: usize, + ) -> SessionConfig { let mut session_config = SessionConfig::default(); session_config .options_mut() .optimizer .enable_join_dynamic_filter_pushdown = true; + session_config + .options_mut() + .optimizer + .preserve_file_partitions = preserve_file_partitions; + session_config + } - let join = HashJoinExec::try_new( + fn partitioned_inner_hash_join( + left: Arc, + right: Arc, + on: JoinOn, + ) -> Result { + HashJoinExec::try_new( left, right, on, None, - &JoinType::RightSemi, + &JoinType::Inner, None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNull, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, false, - )?; + ) + } + + #[test] + fn dynamic_filter_rejects_range_partitioning() -> Result<()> { + let (left_schema, right_schema, on) = build_schema_and_on()?; + let left = range_partitioned_test_input(left_schema, "b1")?; + let right = range_partitioned_test_input(right_schema, "b1")?; + + let session_config = join_dynamic_filter_session_config(0); + let join = partitioned_inner_hash_join(left, right, on)?; + + assert!(matches!( + join.required_input_distribution().as_slice(), + [ + Distribution::KeyPartitioned(_), + Distribution::KeyPartitioned(_) + ] + )); + assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options())); + + Ok(()) + } + + #[test] + fn dynamic_filter_rejects_preserve_file_partitions() -> Result<()> { + let (left_schema, right_schema, on) = build_schema_and_on()?; + let left = hash_partitioned_test_input(left_schema, "b1", 2)?; + let right = hash_partitioned_test_input(right_schema, "b1", 2)?; + + let session_config = join_dynamic_filter_session_config(1); + let join = partitioned_inner_hash_join(left, right, on)?; + + assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options())); + + Ok(()) + } + + #[test] + fn dynamic_filter_rejects_mismatched_hash_counts() -> Result<()> { + let (left_schema, right_schema, on) = build_schema_and_on()?; + let left = hash_partitioned_test_input(left_schema, "b1", 2)?; + let right = hash_partitioned_test_input(right_schema, "b1", 3)?; + + let session_config = join_dynamic_filter_session_config(0); + let join = partitioned_inner_hash_join(left, right, on)?; assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options())); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 7ecace6b0e530..18d2208b35c8c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -145,12 +145,11 @@ pub fn adjust_right_output_partitioning( .collect::>()?; Partitioning::Hash(new_exprs, *size) } - Partitioning::Range(_) => { - // Range partitioning optimizer propagation is tracked in - // https://github.com/apache/datafusion/issues/22395 - return not_impl_err!( - "Join output partitioning with range partitioning is not implemented" - ); + // Range partitioning can satisfy join input requirements, but range + // output propagation needs broader join semantics coverage. + // https://github.com/apache/datafusion/issues/22395 + Partitioning::Range(range) => { + Partitioning::UnknownPartitioning(range.partition_count()) } result => result.clone(), }; diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt index 2b7a2cfdf4083..6b634d4075d15 100644 --- a/datafusion/sqllogictest/test_files/range_partitioning.slt +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -79,16 +79,53 @@ SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key O ########## -# TEST 3: Join on Range Partition Column -# Both inputs expose Range partitioning on range_key. Join planning currently -# reaches the unsupported Range output-partitioning path; later optimizer PRs -# can replace this baseline with a successful plan and result test. +# TEST 3: Range Join Avoids Hash Repartition +# Both inputs expose compatible Range partitioning on range_key. Planning can +# satisfy the partitioned join without inserting Hash repartitioning. ########## -query error This feature is not implemented: Join output partitioning with range partitioning is not implemented -SELECT l.range_key, l.value, r.value +query TT +EXPLAIN SELECT l.range_key, l.value, r.value FROM range_partitioned l JOIN range_partitioned r ON l.range_key = r.range_key; +---- +physical_plan +01)HashJoinExec: mode=Partitioned, join_type=Inner, on=[(range_key@0, range_key@0)], projection=[range_key@0, value@1, value@3] +02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false +03)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false + +query III +SELECT l.range_key, l.value, r.value +FROM range_partitioned l +JOIN range_partitioned r ON l.range_key = r.range_key +ORDER BY l.range_key; +---- +1 10 10 +5 50 50 +10 100 100 +15 150 150 +20 200 200 +25 250 250 +30 300 300 +35 350 350 + +########## +# TEST 3b: Non-Range Join Repartitions +# The source Range partitioning does not satisfy a join on non_range_key, so +# planning still inserts Hash repartitioning. +########## + +query TT +EXPLAIN SELECT l.non_range_key, l.value, r.value +FROM range_partitioned l +JOIN range_partitioned r ON l.non_range_key = r.non_range_key; +---- +physical_plan +01)HashJoinExec: mode=Partitioned, join_type=Inner, on=[(non_range_key@0, non_range_key@0)], projection=[non_range_key@0, value@1, value@3] +02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[non_range_key, value], output_partitioning=UnknownPartitioning(4), file_type=csv, has_header=false +04)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 +05)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[non_range_key, value], output_partitioning=UnknownPartitioning(4), file_type=csv, has_header=false ########## # TEST 4: Union of Range Partitioned Inputs