Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 176 additions & 2 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};

use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::{Distribution, RangePartitioning, SplitPoint};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::ExecutionPlan;
use datafusion_physical_plan::expressions::col;
Expand All @@ -70,7 +70,8 @@ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, displayable,
DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties,
displayable,
};
use insta::Settings;

Expand Down Expand Up @@ -325,6 +326,50 @@ fn parquet_exec_multiple_sorted(
DataSourceExec::from_data_source(config)
}

fn parquet_exec_with_output_partitioning(
output_partitioning: Partitioning,
) -> Arc<DataSourceExec> {
let file_groups = (0..output_partitioning.partition_count())
.map(|idx| FileGroup::new(vec![PartitionedFile::new(format!("p{idx}"), 100)]))
.collect();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(ParquetSource::new(schema())),
)
.with_file_groups(file_groups)
.with_output_partitioning(Some(output_partitioning))
.build();

DataSourceExec::from_data_source(config)
}

fn range_partitioning(
key: &str,
split_points: impl IntoIterator<Item = i64>,
options: SortOptions,
) -> Result<Partitioning> {
let ordering = [PhysicalSortExpr {
expr: col(key, &schema())?,
options,
}]
.into();
let split_points = split_points
.into_iter()
.map(|value| SplitPoint::new(vec![ScalarValue::Int64(Some(value))]))
.collect();
Ok(Partitioning::Range(RangePartitioning::try_new(
ordering,
split_points,
)?))
}

fn hash_partitioning(key: &str, partition_count: usize) -> Result<Partitioning> {
Ok(Partitioning::Hash(
vec![col(key, &schema())?],
partition_count,
))
}

fn csv_exec() -> Arc<DataSourceExec> {
csv_exec_with_sort(vec![])
}
Expand Down Expand Up @@ -700,6 +745,135 @@ impl TestConfig {
}
}

fn partitioned_join_plan(
left_partitioning: Partitioning,
right_partitioning: Partitioning,
join_type: JoinType,
) -> Arc<dyn ExecutionPlan> {
let left = parquet_exec_with_output_partitioning(left_partitioning);
let right = parquet_exec_with_output_partitioning(right_partitioning);
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
)];
hash_join_exec(left, right, &join_on, &join_type)
}

#[test]
fn inner_range_join_keeps_range_partitioning() -> Result<()> {
let join = partitioned_join_plan(
range_partitioning("a", [10, 20], SortOptions::default())?,
range_partitioning("b", [10, 20], SortOptions::default())?,
JoinType::Inner,
);

let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT);

assert_plan!(
plan,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)], 3), file_type=parquet
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([b@1 ASC], [(10), (20)], 3), file_type=parquet
"
);

Ok(())
}

#[test]
fn inner_range_join_rehashes_different_bounds() -> Result<()> {
let join = partitioned_join_plan(
range_partitioning("a", [10, 20], SortOptions::default())?,
range_partitioning("b", [15, 20], SortOptions::default())?,
JoinType::Inner,
);

let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT);

assert_plan!(
plan,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=3
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)], 3), file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=3
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([b@1 ASC], [(15), (20)], 3), file_type=parquet
"
);

Ok(())
}

#[test]
fn inner_hash_join_rehashes_mismatched_counts() -> Result<()> {
let join = partitioned_join_plan(
hash_partitioning("a", 11)?,
hash_partitioning("b", 12)?,
JoinType::Inner,
);

let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT);

assert_plan!(
plan,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=11
DataSourceExec: file_groups={11 groups: [[p0], [p1], [p2], [p3], [p4], [p5], [p6], [p7], [p8], [p9], [p10]]}, projection=[a, b, c, d, e], output_partitioning=Hash([a@0], 11), file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=12
DataSourceExec: file_groups={12 groups: [[p0], [p1], [p2], [p3], [p4], [p5], [p6], [p7], [p8], [p9], [p10], [p11]]}, projection=[a, b, c, d, e], output_partitioning=Hash([b@1], 12), file_type=parquet
"
);
Ok(())
}

#[test]
fn inner_hash_join_rehashes_to_target_count() -> Result<()> {
let join = partitioned_join_plan(
hash_partitioning("a", 3)?,
hash_partitioning("b", 3)?,
JoinType::Inner,
);

let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT);

assert_plan!(
plan,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=3
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Hash([a@0], 3), file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=3
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Hash([b@1], 3), file_type=parquet
"
);
Ok(())
}

#[test]
fn non_inner_range_join_rehashes() -> Result<()> {
let join = partitioned_join_plan(
range_partitioning("a", [10, 20], SortOptions::default())?,
range_partitioning("b", [10, 20], SortOptions::default())?,
JoinType::Left,
);

let plan = TestConfig::default().to_plan(join, &DISTRIB_DISTRIB_SORT);

assert_plan!(
plan,
@r"
HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=3
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)], 3), file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=3
DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]}, projection=[a, b, c, d, e], output_partitioning=Range([b@1 ASC], [(10), (20)], 3), file_type=parquet
"
);
Ok(())
}

#[test]
fn multi_hash_joins() -> Result<()> {
let left = parquet_exec();
Expand Down
54 changes: 50 additions & 4 deletions datafusion/core/tests/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use insta::assert_snapshot;
use std::sync::Arc;

use crate::physical_optimizer::test_utils::{
bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec,
projection_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options,
sort_merge_join_exec, sort_preserving_merge_exec, union_exec,
bounded_window_exec, global_limit_exec, hash_join_exec, local_limit_exec,
memory_exec, projection_exec, repartition_exec, sort_exec, sort_expr,
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, union_exec,
};

use arrow::compute::SortOptions;
Expand All @@ -30,8 +30,8 @@ use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTab
use datafusion::prelude::{CsvReadOptions, SessionContext};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{JoinType, Result, ScalarValue};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::expressions::{Literal, col};
use datafusion_physical_expr::{Partitioning, RangePartitioning, SplitPoint};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
Expand Down Expand Up @@ -400,6 +400,52 @@ fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
);
}

fn range_partitioned_exec(
schema: &SchemaRef,
key: &str,
split_points: impl IntoIterator<Item = i32>,
) -> Result<Arc<dyn ExecutionPlan>> {
let split_points = split_points
.into_iter()
.map(|value| SplitPoint::new(vec![ScalarValue::Int32(Some(value))]))
.collect();
let partitioning = Partitioning::Range(RangePartitioning::try_new(
[sort_expr(key, schema)].into(),
split_points,
)?);
let input = memory_exec(schema);

RepartitionExec::try_new(input, partitioning)
.map(|exec| Arc::new(exec) as Arc<dyn ExecutionPlan>)
}

#[test]
fn test_partitioned_hash_join_requires_co_partitioned_children() -> Result<()> {
let schema = create_test_schema2();
let join_on = vec![(col("a", &schema)?, col("b", &schema)?)];
let right = range_partitioned_exec(&schema, "b", [10])?;

let valid_join = hash_join_exec(
range_partitioned_exec(&schema, "a", [10])?,
Arc::clone(&right),
join_on.clone(),
None,
&JoinType::Inner,
)?;
assert_sanity_check(&valid_join, true);

let invalid_join = hash_join_exec(
range_partitioned_exec(&schema, "a", [20])?,
right,
join_on,
None,
&JoinType::Inner,
)?;
assert_sanity_check(&invalid_join, false);

Ok(())
}

#[tokio::test]
/// Tests that plan is valid when the sort requirements are satisfied.
async fn test_bounded_window_agg_sort_requirement() -> Result<()> {
Expand Down
Loading
Loading