diff --git a/Cargo.lock b/Cargo.lock index c97b3aec96673..1b84a6b9792cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2633,7 +2633,6 @@ dependencies = [ "chrono", "clap", "datafusion", - "datafusion-datasource", "datafusion-spark", "datafusion-substrait", "env_logger", diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 4f83ec4b3730f..796fca372b5bb 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -325,7 +325,7 @@ pub fn evaluate_partition_prefix<'a>( } } -fn filter_partitions( +pub fn filter_partitioned_file( pf: PartitionedFile, filters: &[Expr], df_schema: &DFSchema, @@ -447,7 +447,7 @@ pub async fn pruned_partition_list<'a>( )) }) .try_filter_map(move |pf| { - futures::future::ready(filter_partitions(pf, filters, &df_schema)) + futures::future::ready(filter_partitioned_file(pf, filters, &df_schema)) }) .boxed()) } diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 55840eb0e3122..8e14fce341df5 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -20,7 +20,7 @@ use datafusion_catalog::Session; use datafusion_common::plan_err; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_format::FileFormat; -use datafusion_expr::SortExpr; +use datafusion_expr::{Partitioning, SortExpr}; use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; @@ -53,6 +53,46 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Declared output partitioning for scans from this table. + /// + /// Expressions are logical expressions over the full table schema. When set, + /// [`ListingTable`](crate::ListingTable) creates one file group per + /// declared output partition. When unset, file grouping uses the scan-time + /// [`SessionConfig::target_partitions`](datafusion_execution::config::SessionConfig::target_partitions). + /// + /// Files are listed in path order, split into whole-file groups across the + /// declared partition count, and then padded with trailing empty groups when + /// needed. DataFusion does not route files by partition values or validate + /// row placement, so callers must ensure file group `i` contains rows for + /// partition `i`. Layouts that require explicit file-to-partition assignment + /// are not supported. + /// + /// For example, range partitioning on column `a` with split points + /// `[10, 20, 30]` declares four output partitions. With three path-ordered + /// files, the trailing partition is preserved as empty: + /// + /// ```text + /// files in path order: f0, f1, f2 + /// + /// file groups: + /// partition 0: [f0] + /// partition 1: [f1] + /// partition 2: [f2] + /// partition 3: [] + /// ``` + /// + /// With five path-ordered files, a partition can contain multiple files: + /// + /// ```text + /// files in path order: f0, f1, f2, f3, f4 + /// + /// file groups: + /// partition 0: [f0, f1] + /// partition 1: [f2, f3] + /// partition 2: [f4] + /// partition 3: [] + /// ``` + pub output_partitioning: Option, } impl ListingOptions { @@ -66,6 +106,7 @@ impl ListingOptions { format, table_partition_cols: vec![], file_sort_order: vec![], + output_partitioning: None, } } @@ -113,6 +154,17 @@ impl ListingOptions { self } + /// Set declared output partitioning. + /// + /// See [`Self::output_partitioning`] for the contract. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set `table partition columns` on [`ListingOptions`] and returns self. /// /// "partition columns," used to support [Hive Partitioning], are diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 04feec2b6e437..36d85b981c06c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -16,14 +16,17 @@ // under the License. use crate::config::SchemaSource; -use crate::helpers::{expr_applicable_for_cols, pruned_partition_list}; +use crate::helpers::{ + expr_applicable_for_cols, filter_partitioned_file, pruned_partition_list, +}; use crate::{ListingOptions, ListingTableConfig}; use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef}; use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema, + Constraints, DFSchema, SchemaExt, Statistics, internal_datafusion_err, plan_err, + project_schema, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; @@ -37,8 +40,10 @@ use datafusion_datasource::{ use datafusion_execution::cache::cache_manager::{FileStatisticsCache, TableScopedPath}; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::create_lex_ordering; +use datafusion_expr::{ + Expr, Partitioning as LogicalPartitioning, TableProviderFilterPushDown, TableType, +}; +use datafusion_physical_expr::{create_lex_ordering, create_physical_partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; @@ -448,6 +453,20 @@ fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option datafusion_common::Result { + let files = file_group + .into_inner() + .into_iter() + .map(|file| filter_partitioned_file(file, filters, df_schema)) + .filter_map(Result::transpose) + .collect::>>()?; + Ok(FileGroup::new(files)) +} + // Expressions can be used for partition pruning if they can be evaluated using // only the partition columns and there are partition columns. fn can_be_evaluated_for_partition_pruning( @@ -515,9 +534,19 @@ impl TableProvider for ListingTable { can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter) }); - // We should not limit the number of partitioned files to scan if there are filters and limit - // at the same time. This is because the limit should be applied after the filters are applied. - let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let declared_output_partitioning = self.options.output_partitioning.as_ref(); + + // We should not limit files before assigning declared output partitions + // or before applying non-partition filters. + let statistic_file_limit = + if filters.is_empty() && declared_output_partitioning.is_none() { + limit + } else { + None + }; + let file_group_count = declared_output_partitioning + .and_then(LogicalPartitioning::partition_count) + .unwrap_or_else(|| state.config().target_partitions()); let ListFilesResult { file_groups: mut partitioned_file_lists, @@ -537,17 +566,19 @@ impl TableProvider for ListingTable { state.execution_props(), &partitioned_file_lists, )?; - match state - .config_options() - .execution - .split_file_groups_by_statistics + let split_file_groups_by_statistics = declared_output_partitioning.is_none() + && state + .config_options() + .execution + .split_file_groups_by_statistics; + match split_file_groups_by_statistics .then(|| { output_ordering.first().map(|output_ordering| { FileScanConfig::split_groups_by_statistics_with_target_partitions( &self.table_schema, &partitioned_file_lists, output_ordering, - state.config().target_partitions(), + file_group_count, ) }) }) @@ -555,7 +586,7 @@ impl TableProvider for ListingTable { { Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), Some(Ok(new_groups)) => { - if new_groups.len() <= state.config().target_partitions() { + if new_groups.len() <= file_group_count { partitioned_file_lists = new_groups; } else { log::debug!( @@ -566,6 +597,41 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; + let output_partitioning = if let Some(output_partitioning) = + declared_output_partitioning + { + let output_partitioning = match output_partitioning { + LogicalPartitioning::RoundRobinBatch(_) => { + return datafusion_common::not_impl_err!( + "RoundRobinBatch output partitioning is not supported for ListingTable" + ); + } + LogicalPartitioning::DistributeBy(_) => { + return datafusion_common::not_impl_err!( + "DistributeBy output partitioning is not supported for ListingTable" + ); + } + LogicalPartitioning::Hash(_, _) | LogicalPartitioning::Range(_) => { + let df_schema = DFSchema::try_from(Arc::clone(&self.table_schema))?; + create_physical_partitioning( + output_partitioning, + &df_schema, + state.execution_props(), + )? + } + }; + let partition_count = output_partitioning.partition_count(); + if partitioned_file_lists.len() != partition_count { + return plan_err!( + "ListingTable output_partitioning has {partition_count} partitions, but the scan has {} file groups", + partitioned_file_lists.len() + ); + } + Some(output_partitioning) + } else { + None + }; + let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { @@ -575,24 +641,23 @@ impl TableProvider for ListingTable { }; let file_source = self.create_file_source(); + let scan_config = FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) + .with_statistics(statistics) + .with_projection_indices(projection)? + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_output_partitioning(output_partitioning) + .with_expr_adapter(self.expr_adapter_factory.clone()) + .with_partitioned_by_file_group(partitioned_by_file_group) + .build(); // create the execution plan let plan = self .options .format - .create_physical_plan( - state, - FileScanConfigBuilder::new(object_store_url, file_source) - .with_file_groups(partitioned_file_lists) - .with_constraints(self.constraints.clone()) - .with_statistics(statistics) - .with_projection_indices(projection)? - .with_limit(limit) - .with_output_ordering(output_ordering) - .with_expr_adapter(self.expr_adapter_factory.clone()) - .with_partitioned_by_file_group(partitioned_by_file_group) - .build(), - ) + .create_physical_plan(state, scan_config) .await?; Ok(ScanResult::new(plan)) @@ -704,28 +769,41 @@ impl ListingTable { /// Get the list of files for a scan as well as the file level statistics. /// The list is grouped to let the execution plan know how the files should /// be distributed to different threads / executors. + /// + /// If [`ListingOptions::output_partitioning`] is set, returns one file + /// group per declared partition, including empty trailing groups. pub async fn list_files_for_scan<'a>( &'a self, ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, ) -> datafusion_common::Result { - let store = if let Some(url) = self.table_paths.first() { - ctx.runtime_env().object_store(url)? + if let Some(output_partitioning) = self.options.output_partitioning.as_ref() { + self.list_files_for_declared_output_partitioning( + ctx, + output_partitioning, + filters, + ) + .await } else { - return Ok(ListFilesResult { - file_groups: vec![], - statistics: Statistics::new_unknown(&self.file_schema), - grouped_by_partition: false, - }); - }; + self.list_files_for_regular_scan(ctx, filters, limit).await + } + } + + async fn collect_files_for_scan<'a>( + &'a self, + ctx: &'a dyn Session, + store: &'a Arc, + listing_time_filters: &'a [Expr], + file_limit: Option, + ) -> datafusion_common::Result<(FileGroup, bool)> { // list files (with partitions) let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { pruned_partition_list( ctx, store.as_ref(), table_path, - filters, + listing_time_filters, &self.options.file_extension, &self.options.table_partition_cols, ) @@ -739,7 +817,7 @@ impl ListingTable { .map(|part_file| async { let part_file = part_file?; let (statistics, ordering) = if ctx.config().collect_statistics() { - self.do_collect_statistics_and_ordering(ctx, &store, &part_file) + self.do_collect_statistics_and_ordering(ctx, store, &part_file) .await? } else { (Arc::new(Statistics::new_unknown(&self.file_schema)), None) @@ -751,8 +829,34 @@ impl ListingTable { .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); - let (file_group, inexact_stats) = - get_files_with_limit(files, limit, ctx.config().collect_statistics()).await?; + get_files_with_limit(files, file_limit, ctx.config().collect_statistics()).await + } + + async fn list_files_for_regular_scan<'a>( + &'a self, + ctx: &'a dyn Session, + filters: &'a [Expr], + limit: Option, + ) -> datafusion_common::Result { + let file_group_count = ctx.config().target_partitions(); + if file_group_count == 0 { + return plan_err!( + "ListingTable requires target_partitions to be greater than zero" + ); + } + + let store = if let Some(url) = self.table_paths.first() { + ctx.runtime_env().object_store(url)? + } else { + return Ok(ListFilesResult { + file_groups: vec![], + statistics: Statistics::new_unknown(&self.file_schema), + grouped_by_partition: false, + }); + }; + let (file_group, inexact_stats) = self + .collect_files_for_scan(ctx, &store, filters, limit) + .await?; // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N // @@ -763,26 +867,100 @@ impl ListingTable { let (file_groups, grouped_by_partition) = if threshold > 0 && !self.options.table_partition_cols.is_empty() { - let grouped = file_group - .group_by_partition_values(ctx.config().target_partitions()); + let grouped = file_group.group_by_partition_values(file_group_count); if grouped.len() >= threshold { (grouped, true) } else { let all_files: Vec<_> = grouped.into_iter().flat_map(|g| g.into_inner()).collect(); ( - FileGroup::new(all_files) - .split_files(ctx.config().target_partitions()), + FileGroup::new(all_files).split_files(file_group_count), false, ) } } else { - ( - file_group.split_files(ctx.config().target_partitions()), - false, - ) + (file_group.split_files(file_group_count), false) }; + self.list_files_result_from_groups( + ctx, + file_groups, + inexact_stats, + grouped_by_partition, + ) + } + + async fn list_files_for_declared_output_partitioning<'a>( + &'a self, + ctx: &'a dyn Session, + output_partitioning: &LogicalPartitioning, + filters: &'a [Expr], + ) -> datafusion_common::Result { + let Some(file_group_count) = output_partitioning.partition_count() else { + return datafusion_common::not_impl_err!( + "DistributeBy output partitioning is not supported for ListingTable" + ); + }; + if file_group_count == 0 { + return plan_err!( + "ListingTable output_partitioning requires at least one partition" + ); + } + + let store = if let Some(url) = self.table_paths.first() { + ctx.runtime_env().object_store(url)? + } else { + return Ok(ListFilesResult { + file_groups: vec![], + statistics: Statistics::new_unknown(&self.file_schema), + grouped_by_partition: false, + }); + }; + let (file_group, inexact_stats) = + self.collect_files_for_scan(ctx, &store, &[], None).await?; + let mut file_groups = file_group.split_files(file_group_count); + if !file_groups.is_empty() { + file_groups.resize_with(file_group_count, || FileGroup::new(vec![])); + } + let file_groups = + self.filter_declared_file_groups_by_partition_filters(file_groups, filters)?; + + self.list_files_result_from_groups(ctx, file_groups, inexact_stats, false) + } + + fn filter_declared_file_groups_by_partition_filters( + &self, + file_groups: Vec, + filters: &[Expr], + ) -> datafusion_common::Result> { + if filters.is_empty() { + return Ok(file_groups); + } + + let df_schema = DFSchema::from_unqualified_fields( + self.options + .table_partition_cols + .iter() + .map(|(name, data_type)| Field::new(name, data_type.clone(), true)) + .collect(), + Default::default(), + )?; + + file_groups + .into_iter() + .map(|file_group| { + filter_file_group_by_partition_filters(file_group, filters, &df_schema) + }) + .collect::>>() + } + + fn list_files_result_from_groups( + &self, + ctx: &dyn Session, + file_groups: Vec, + inexact_stats: bool, + grouped_by_partition: bool, + ) -> datafusion_common::Result { let (file_groups, stats) = compute_all_files_statistics( file_groups, self.schema(), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 50b3855a0ab7c..39c20f9b786c2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -123,7 +123,7 @@ mod tests { }, }; use arrow::{compute::SortOptions, record_batch::RecordBatch}; - use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; use datafusion_catalog::TableProvider; use datafusion_catalog_listing::{ ListingOptions, ListingTable, ListingTableConfig, SchemaSource, @@ -137,13 +137,18 @@ mod tests { use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::FileFormat; use datafusion_expr::dml::InsertOp; - use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use datafusion_expr::{ + BinaryExpr, LogicalPlanBuilder, Operator, Partitioning as LogicalPartitioning, + RangePartitioning as LogicalRangePartitioning, + }; use datafusion_physical_expr::PhysicalSortExpr; - use datafusion_physical_expr::expressions::binary; + use datafusion_physical_expr::expressions::{Column, binary}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::statistics::StatisticsArgs; - use datafusion_physical_plan::{ExecutionPlanProperties, collect}; + use datafusion_physical_plan::{ + ExecutionPlanProperties, Partitioning, RangePartitioning, SplitPoint, collect, + }; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -176,6 +181,21 @@ mod tests { .collect() } + fn listing_table_with_files( + ctx: &SessionContext, + files: &[&str], + table_path: &str, + options: ListingOptions, + schema: Schema, + ) -> Result { + register_test_store(ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let config = ListingTableConfig::new(ListingTableUrl::parse(table_path)?) + .with_listing_options(options) + .with_schema(Arc::new(schema)); + ListingTable::try_new(config) + } + #[tokio::test] async fn test_schema_source_tracking_comprehensive() -> Result<()> { let ctx = SessionContext::new(); @@ -1289,6 +1309,236 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_list_files_uses_declared_output_partitioning_count() -> Result<()> { + let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"]; + + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(1), + ); + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(LogicalPartitioning::Range( + LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::from(10i32)]), + SplitPoint::new(vec![ScalarValue::from(20i32)]), + SplitPoint::new(vec![ScalarValue::from(30i32)]), + ], + )?, + ))); + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new("a", DataType::Int32, false)]), + )?; + + let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let group_sizes = result + .file_groups + .iter() + .map(|group| group.len()) + .collect::>(); + + assert_eq!(group_sizes, vec![1, 1, 0, 0]); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_normalizes_split_point_types() -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::TimestampNanosecond( + Some(123_000_000_000), + None, + )])], + )?); + let expected_output_partitioning = + Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::TimestampSecond( + Some(123), + None, + )])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new( + "a", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]), + )?; + + let scan = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!(scan.output_partitioning(), &expected_output_partitioning); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_rejects_invalid_split_point_type() + -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Utf8(Some( + "not-an-int".to_string(), + ))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new("a", DataType::Int32, false)]), + )?; + + let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Range output partitioning split point 0 value 0 with type Utf8 cannot be represented exactly as ordering expression type Int32" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_rejects_lossy_timestamp_split_point() + -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::TimestampNanosecond( + Some(123_456), + None, + )])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new( + "a", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]), + )?; + + let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Range output partitioning split point 0 value 0 with type Timestamp(ns) cannot be represented exactly as ordering expression type Timestamp(s)" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_partition_filter_preserves_declared_output_partitioning() -> Result<()> + { + let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"]; + + let ctx = SessionContext::new(); + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("pid").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::from(2i32)])], + )?); + let expected_output_partitioning = + Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("pid", 1)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::from(2i32)])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]) + .with_output_partitioning(Some(output_partitioning.clone())); + + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/test/", + opt, + Schema::new(vec![Field::new("a", DataType::Boolean, false)]), + )?; + + let unfiltered = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!( + unfiltered.output_partitioning(), + &expected_output_partitioning + ); + + let filter = Expr::eq(col("pid"), lit(2_i32)); + let file_groups = table + .list_files_for_scan(&ctx.state(), std::slice::from_ref(&filter), None) + .await? + .file_groups + .into_iter() + .map(|group| { + group + .into_inner() + .into_iter() + .map(|file| file.path().to_string()) + .collect::>() + }) + .collect::>(); + assert_eq!( + file_groups, + vec![ + Vec::::new(), + vec!["bucket/test/pid=2/file2".to_string()] + ] + ); + + let filtered = table.scan(&ctx.state(), None, &[filter], None).await?; + assert_eq!( + filtered.output_partitioning(), + &expected_output_partitioning + ); + + Ok(()) + } + #[tokio::test] async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { let files = [ diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 190a08da12222..9cd2fd1131a87 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -32,10 +32,11 @@ use crate::logical_expr::{ Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window, }; use crate::logical_expr::{ - Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, - UserDefinedLogicalNode, + Expr, LogicalPlan, PlanType, Repartition, UserDefinedLogicalNode, +}; +use crate::physical_expr::{ + create_physical_expr, create_physical_exprs, create_physical_partitioning, }; -use crate::physical_expr::{create_physical_expr, create_physical_exprs}; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; @@ -52,8 +53,8 @@ use crate::physical_plan::union::UnionExec; use crate::physical_plan::unnest::UnnestExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ - ExecutionPlan, ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, - WindowExpr, displayable, windows, + ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, WindowExpr, + displayable, windows, }; use crate::schema_equivalence::schema_satisfied_by; @@ -98,7 +99,7 @@ use datafusion_physical_expr::aggregate::{ }; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::{ - LexOrdering, PhysicalSortExpr, RangePartitioning, create_physical_sort_exprs, + LexOrdering, PhysicalSortExpr, create_physical_sort_exprs, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; @@ -1251,41 +1252,11 @@ impl DefaultPhysicalPlanner { }) => { let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); - let physical_partitioning = match partitioning_scheme { - LogicalPartitioning::RoundRobinBatch(n) => { - Partitioning::RoundRobinBatch(*n) - } - LogicalPartitioning::Hash(expr, n) => { - let runtime_expr = expr - .iter() - .map(|e| { - create_physical_expr(e, input_dfschema, execution_props) - }) - .collect::>>()?; - Partitioning::Hash(runtime_expr, *n) - } - LogicalPartitioning::Range(range) => { - let sort_exprs = create_physical_sort_exprs( - range.ordering(), - input_dfschema, - execution_props, - )?; - let ordering = LexOrdering::new(sort_exprs).ok_or_else(|| { - internal_datafusion_err!( - "Range repartitioning requires non-empty ordering" - ) - })?; - Partitioning::Range(RangePartitioning::try_new( - ordering, - range.split_points().to_vec(), - )?) - } - LogicalPartitioning::DistributeBy(_) => { - return not_impl_err!( - "Physical plan does not support DistributeBy partitioning" - ); - } - }; + let physical_partitioning = create_physical_partitioning( + partitioning_scheme, + input_dfschema, + execution_props, + )?; Arc::new(RepartitionExec::try_new( physical_input, physical_partitioning, @@ -3249,8 +3220,8 @@ mod tests { use crate::datasource::MemTable; use crate::datasource::file_format::options::CsvReadOptions; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, PlanProperties, SendableRecordBatchStream, - expressions, + DisplayAs, DisplayFormatType, Partitioning, PlanProperties, + SendableRecordBatchStream, expressions, }; use crate::prelude::{SessionConfig, SessionContext}; use crate::test_util::{scan_empty, scan_empty_with_partitions}; @@ -3271,8 +3242,8 @@ mod tests { use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::{ Accumulator, AggregateUDF, AggregateUDFImpl, ExprFunctionExt, LogicalPlanBuilder, - RangePartitioning, Signature, TableSource, UserDefinedLogicalNodeCore, - Volatility, WindowFunctionDefinition, col, lit, + Partitioning as LogicalPartitioning, RangePartitioning, Signature, TableSource, + UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition, col, lit, }; use datafusion_functions_aggregate::count::{count_all, count_udaf}; use datafusion_functions_aggregate::expr_fn::sum; diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index b1ba0584c96a0..21d733458cbc9 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -40,7 +40,7 @@ use datafusion_expr::Operator; use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExprs, ProjectionMapping}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -205,7 +205,17 @@ pub struct FileScanConfig { /// /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. + /// + /// Follow-up: remove this redundant field in favor of + /// `output_partitioning`, see . pub partitioned_by_file_group: bool, + /// Declared physical output partitioning for this scan. + /// + /// Expressions are against the full table schema, before scan projection or + /// filtering. `ListingTable` validates partition count before building the + /// scan, and direct builders with mismatched counts fall back to + /// `UnknownPartitioning`. + pub output_partitioning: Option, } /// A builder for [`FileScanConfig`]'s. @@ -274,6 +284,7 @@ pub struct FileScanConfigBuilder { file_groups: Vec, statistics: Option, output_ordering: Vec, + output_partitioning: Option, file_compression_type: Option, batch_size: Option, expr_adapter_factory: Option>, @@ -297,6 +308,7 @@ impl FileScanConfigBuilder { file_groups: vec![], statistics: None, output_ordering: vec![], + output_partitioning: None, file_compression_type: None, limit: None, preserve_order: false, @@ -463,6 +475,15 @@ impl FileScanConfigBuilder { self } + /// Set declared physical output partitioning for this scan. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set the file compression type pub fn with_file_compression_type( mut self, @@ -521,6 +542,7 @@ impl FileScanConfigBuilder { file_groups, statistics, output_ordering, + output_partitioning, file_compression_type, batch_size, expr_adapter_factory: expr_adapter, @@ -550,6 +572,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + output_partitioning, } } } @@ -562,6 +585,7 @@ impl From for FileScanConfigBuilder { file_groups: config.file_groups, statistics: Some(config.statistics), output_ordering: config.output_ordering, + output_partitioning: config.output_partitioning, file_compression_type: Some(config.file_compression_type), limit: config.limit, preserve_order: config.preserve_order, @@ -573,6 +597,52 @@ impl From for FileScanConfigBuilder { } } +fn hash_partitioning_from_partition_fields( + schema: &Schema, + partition_cols: &Fields, + partition_count: usize, +) -> Option { + if partition_cols.is_empty() { + return None; + } + + let mut exprs: Vec> = Vec::with_capacity(partition_cols.len()); + for partition_col in partition_cols { + let name = partition_col.name(); + let idx = schema + .fields() + .iter() + .position(|field| field.name() == name)?; + exprs.push(Arc::new(Column::new(name, idx))); + } + + Some(Partitioning::Hash(exprs, partition_count)) +} + +fn project_output_partitioning( + partitioning: &Partitioning, + mapping: &ProjectionMapping, + input_schema: &SchemaRef, + partition_count: usize, +) -> Partitioning { + let input_eq_properties = EquivalenceProperties::new(Arc::clone(input_schema)); + match partitioning { + Partitioning::Hash(exprs, _) => { + let projected_exprs = input_eq_properties + .project_expressions(exprs, mapping) + .collect::>>(); + projected_exprs + .map(|exprs| Partitioning::Hash(exprs, partition_count)) + .unwrap_or_else(|| Partitioning::UnknownPartitioning(partition_count)) + } + Partitioning::Range(_) + | Partitioning::RoundRobinBatch(_) + | Partitioning::UnknownPartitioning(_) => { + partitioning.project(mapping, &input_eq_properties) + } + } +} + impl DataSource for FileScanConfig { fn open( &self, @@ -660,6 +730,10 @@ impl DataSource for FileScanConfig { display_orderings(f, &orderings)?; + if self.output_partitioning.is_some() { + write!(f, ", output_partitioning={}", self.output_partitioning())?; + } + if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; } @@ -683,10 +757,9 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - // When files are grouped by partition values, we cannot allow byte-range - // splitting. It would mix rows from different partition values across - // file groups, breaking the Hash partitioning. - if self.partitioned_by_file_group { + // When file groups define output partitioning, repartitioning files + // would invalidate the partition-to-file-group mapping. + if self.output_partitioning.is_some() || self.partitioned_by_file_group { return Ok(None); } @@ -702,13 +775,18 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// - /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on - /// the Hive partition columns, allowing the optimizer to skip hash repartitioning - /// for aggregates and joins on those columns. + /// When `output_partitioning` is set, this returns the declared partitioning + /// after applying scan projection. When `partitioned_by_file_group` is true, + /// this returns `Partitioning::Hash` on the Hive partition columns, allowing + /// the optimizer to skip hash repartitioning for aggregates and joins on + /// those columns. + /// + /// If projection or partition count validation fails, this returns + /// `UnknownPartitioning`. /// /// Tradeoffs - /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with - /// `GROUP BY` or `ORDER BY` on partition columns. + /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries whose + /// required distribution is satisfied by the scan's output partitioning. /// - Cost: Files are grouped by partition values rather than split by byte /// ranges, which may reduce I/O parallelism when partition sizes are uneven. /// For simple aggregations without `ORDER BY`, this cost may outweigh the benefit. @@ -717,39 +795,45 @@ impl DataSource for FileScanConfig { /// - Idea: Could allow byte-range splitting within partition-aware groups, /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { - if self.partitioned_by_file_group { - let partition_cols = self.table_partition_cols(); - if !partition_cols.is_empty() { - let projected_schema = match self.projected_schema() { - Ok(schema) => schema, - Err(_) => { - debug!( - "Could not get projected schema, falling back to UnknownPartitioning." - ); - return Partitioning::UnknownPartitioning(self.file_groups.len()); - } - }; - - // Build Column expressions for partition columns based on their - // position in the projected schema - let mut exprs: Vec> = Vec::new(); - for partition_col in partition_cols { - if let Some((idx, _)) = projected_schema - .fields() - .iter() - .enumerate() - .find(|(_, f)| f.name() == partition_col.name()) - { - exprs.push(Arc::new(Column::new(partition_col.name(), idx))); - } - } + let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { + self.partitioned_by_file_group.then(|| { + hash_partitioning_from_partition_fields( + self.file_source.table_schema().table_schema(), + self.table_partition_cols(), + self.file_groups.len(), + ) + })? + }) else { + return Partitioning::UnknownPartitioning(self.file_groups.len()); + }; + if output_partitioning.partition_count() != self.file_groups.len() { + warn!( + "Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.", + output_partitioning.partition_count(), + self.file_groups.len() + ); + return Partitioning::UnknownPartitioning(self.file_groups.len()); + } - if exprs.len() == partition_cols.len() { - return Partitioning::Hash(exprs, self.file_groups.len()); + if let Some(projection) = self.file_source.projection() { + let schema = self.file_source.table_schema().table_schema(); + return match projection.projection_mapping(schema) { + Ok(mapping) => project_output_partitioning( + &output_partitioning, + &mapping, + schema, + self.file_groups.len(), + ), + Err(e) => { + debug!( + "Could not project output partitioning, falling back to UnknownPartitioning: {e}" + ); + Partitioning::UnknownPartitioning(self.file_groups.len()) } - } + }; } - Partitioning::UnknownPartitioning(self.file_groups.len()) + + output_partitioning } /// Computes the effective equivalence properties of this file scan, taking @@ -1043,7 +1127,10 @@ impl DataSource for FileScanConfig { /// when file order must be preserved or the file groups define the output /// partitioning needed for the rest of the plan fn create_sibling_state(&self) -> Option> { - if self.preserve_order || self.partitioned_by_file_group { + if self.preserve_order + || self.output_partitioning.is_some() + || self.partitioned_by_file_group + { return None; } @@ -2459,6 +2546,58 @@ mod tests { assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); } + #[test] + fn test_declared_output_partitioning_projects_with_scan() { + let file_schema = aggr_test_schema(); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4); + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![1, 2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = Some(output_partitioning); + + match config.output_partitioning() { + Partitioning::Hash(exprs, num_partitions) => { + assert_eq!(num_partitions, 4); + assert_eq!(exprs.len(), 1); + let column = exprs[0].downcast_ref::().unwrap(); + assert_eq!(column.name(), "c2"); + assert_eq!(column.index(), 0); + } + _ => panic!("Expected Hash partitioning"), + } + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = + Some(Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4)); + + assert!(matches!( + config.output_partitioning(), + Partitioning::UnknownPartitioning(4) + )); + } + #[test] fn test_output_partitioning_no_partition_columns() { let file_schema = aggr_test_schema(); diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b55bd70bdf185..67419944cfde6 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -66,8 +66,9 @@ pub use expressions::{DynamicFilterTracker, DynamicFilterTracking}; pub use partitioning::{Distribution, Partitioning, RangePartitioning}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, - create_ordering, create_physical_sort_expr, create_physical_sort_exprs, - physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, + create_ordering, create_physical_partitioning, create_physical_sort_expr, + create_physical_sort_exprs, physical_exprs_bag_equal, physical_exprs_contains, + physical_exprs_equal, }; pub use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, PhysicalExprRef}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 77ede76e1daa8..6ff5be4e38229 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -21,15 +21,17 @@ use crate::expressions::{self, Column}; use crate::{LexOrdering, PhysicalSortExpr, create_physical_expr}; use arrow::compute::SortOptions; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{DFSchema, HashMap}; +use datafusion_common::{DFSchema, HashMap, ScalarValue, SplitPoint}; use datafusion_common::{Result, plan_err}; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{Expr, SortExpr}; +use datafusion_expr::{Expr, Partitioning as LogicalPartitioning, SortExpr}; +use datafusion_expr_common::casts::try_cast_literal_to_type; use itertools::izip; // Exports: +use crate::{Partitioning, RangePartitioning}; pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -216,6 +218,99 @@ pub fn create_physical_sort_exprs( .collect() } +/// Create physical partitioning from logical partitioning. +pub fn create_physical_partitioning( + partitioning: &LogicalPartitioning, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result { + match partitioning { + LogicalPartitioning::RoundRobinBatch(n) => Ok(Partitioning::RoundRobinBatch(*n)), + LogicalPartitioning::Hash(exprs, partition_count) => { + let exprs = exprs + .iter() + .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .collect::>>()?; + Ok(Partitioning::Hash(exprs, *partition_count)) + } + LogicalPartitioning::Range(range) => { + let ordering = create_physical_sort_exprs( + range.ordering(), + input_dfschema, + execution_props, + )?; + let Some(ordering) = LexOrdering::new(ordering) else { + return plan_err!("Range partitioning requires non-empty ordering"); + }; + let split_points = normalize_range_split_points( + &ordering, + range.split_points(), + input_dfschema.as_arrow(), + )?; + let range = RangePartitioning::try_new(ordering, split_points)?; + Ok(Partitioning::Range(range)) + } + LogicalPartitioning::DistributeBy(_) => { + datafusion_common::not_impl_err!( + "Physical plan does not support DistributeBy partitioning" + ) + } + } +} + +fn normalize_range_split_points( + ordering: &LexOrdering, + split_points: &[SplitPoint], + schema: &Schema, +) -> Result> { + split_points + .iter() + .enumerate() + .map(|(split_idx, split_point)| { + let values = split_point + .values() + .iter() + .zip(ordering.iter()) + .enumerate() + .map(|(value_idx, (value, sort_expr))| { + let target_type = sort_expr.expr.data_type(schema)?; + normalize_range_split_point_value( + value, + &target_type, + split_idx, + value_idx, + ) + }) + .collect::>>()?; + Ok(SplitPoint::new(values)) + }) + .collect() +} + +fn normalize_range_split_point_value( + value: &ScalarValue, + target_type: &DataType, + split_idx: usize, + value_idx: usize, +) -> Result { + let value_type = value.data_type(); + if &value_type == target_type { + return Ok(value.clone()); + } + + if let Some(casted) = try_cast_literal_to_type(value, target_type) { + // Split points define physical partition boundaries, so normalization + // must reject casts that would change the advertised boundary. + if try_cast_literal_to_type(&casted, &value_type).as_ref() == Some(value) { + return Ok(casted); + } + } + + plan_err!( + "Range output partitioning split point {split_idx} value {value_idx} with type {value_type} cannot be represented exactly as ordering expression type {target_type}" + ) +} + pub fn add_offset_to_physical_sort_exprs( sort_exprs: impl IntoIterator, offset: isize, diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index 8745100e5590d..d68973c44ecbf 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1191,6 +1191,7 @@ message FileScanExecConf { optional ProjectionExprs projection_exprs = 13; optional bool partitioned_by_file_group = 14; + optional Partitioning output_partitioning = 15; } message ParquetScanExecNode { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index a85f807bc8020..f8e21030356b0 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -6966,6 +6966,9 @@ impl serde::Serialize for FileScanExecConf { if self.partitioned_by_file_group.is_some() { len += 1; } + if self.output_partitioning.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -7005,6 +7008,9 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.partitioned_by_file_group.as_ref() { struct_ser.serialize_field("partitionedByFileGroup", v)?; } + if let Some(v) = self.output_partitioning.as_ref() { + struct_ser.serialize_field("outputPartitioning", v)?; + } struct_ser.end() } } @@ -7034,6 +7040,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "projectionExprs", "partitioned_by_file_group", "partitionedByFileGroup", + "output_partitioning", + "outputPartitioning", ]; #[allow(clippy::enum_variant_names)] @@ -7050,6 +7058,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { BatchSize, ProjectionExprs, PartitionedByFileGroup, + OutputPartitioning, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7083,6 +7092,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), "projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs), "partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup), + "outputPartitioning" | "output_partitioning" => Ok(GeneratedField::OutputPartitioning), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7114,6 +7124,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut batch_size__ = None; let mut projection_exprs__ = None; let mut partitioned_by_file_group__ = None; + let mut output_partitioning__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -7193,6 +7204,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } partitioned_by_file_group__ = map_.next_value()?; } + GeneratedField::OutputPartitioning => { + if output_partitioning__.is_some() { + return Err(serde::de::Error::duplicate_field("outputPartitioning")); + } + output_partitioning__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -7208,6 +7225,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { batch_size: batch_size__, projection_exprs: projection_exprs__, partitioned_by_file_group: partitioned_by_file_group__, + output_partitioning: output_partitioning__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index 4da38881a88fd..675ead23f4914 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -1816,6 +1816,8 @@ pub struct FileScanExecConf { pub projection_exprs: ::core::option::Option, #[prost(bool, optional, tag = "14")] pub partitioned_by_file_group: ::core::option::Option, + #[prost(message, optional, tag = "15")] + pub output_partitioning: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 7bd1a3dd66d01..53ff4a41d466e 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -552,6 +552,12 @@ pub fn parse_protobuf_file_scan_config( )?; output_ordering.extend(LexOrdering::new(sort_exprs)); } + let output_partitioning = parse_protobuf_partitioning( + proto.output_partitioning.as_ref(), + ctx, + &schema, + proto_converter, + )?; // Parse projection expressions if present and apply to file source let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs { @@ -580,15 +586,18 @@ pub fn parse_protobuf_file_scan_config( file_source }; - let config = FileScanConfigBuilder::new(object_store_url, file_source) + let mut config_builder = FileScanConfigBuilder::new(object_store_url, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) - .with_batch_size(proto.batch_size.map(|s| s as usize)) - .with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false)) - .build(); + .with_output_partitioning(output_partitioning) + .with_batch_size(proto.batch_size.map(|s| s as usize)); + if proto.partitioned_by_file_group.unwrap_or(false) { + config_builder = config_builder.with_partitioned_by_file_group(true); + } + let config = config_builder.build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7310c0928eee4..4614c4f002169 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -498,6 +498,11 @@ pub fn serialize_file_scan_config( serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?; output_orderings.push(ordering) } + let output_partitioning = conf + .output_partitioning + .as_ref() + .map(|partitioning| serialize_partitioning(partitioning, codec, proto_converter)) + .transpose()?; // Fields must be added to the schema so that they can persist in the protobuf, // and then they are to be removed from the schema in `parse_protobuf_file_scan_config` @@ -558,6 +563,7 @@ pub fn serialize_file_scan_config( batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, partitioned_by_file_group: Some(conf.partitioned_by_file_group), + output_partitioning, }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8e80467788598..2022857d4e59d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4080,10 +4080,26 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { Ok(()) } +fn roundtrip_file_scan_config(scan_config: FileScanConfig) -> Result { + let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let result_plan = + roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?; + + let data_source_exec = result_plan + .downcast_ref::() + .expect("Expected DataSourceExec"); + let file_scan_config = data_source_exec + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + Ok(file_scan_config.clone()) +} + #[test] fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { - use datafusion::datasource::physical_plan::FileScanConfig; - let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); @@ -4096,34 +4112,65 @@ fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { .with_partitioned_by_file_group(true) .build(); - assert!(scan_config.partitioned_by_file_group); + assert!(roundtrip_file_scan_config(scan_config)?.partitioned_by_file_group); + Ok(()) +} - let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); +#[test] +fn roundtrip_parquet_exec_output_partitioning() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("col", 0))], 1); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .with_output_partitioning(Some(output_partitioning.clone())) + .build(); - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DefaultPhysicalProtoConverter {}; - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan), - &codec, - &proto_converter, - )?; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; + assert_eq!( + roundtrip_file_scan_config(scan_config)?.output_partitioning, + Some(output_partitioning) + ); - let data_source_exec = result_plan - .downcast_ref::() - .expect("Expected DataSourceExec"); - let file_scan_config = data_source_exec - .data_source() - .downcast_ref::() - .expect("Expected FileScanConfig"); + Ok(()) +} + +#[test] +fn roundtrip_parquet_exec_range_output_partitioning() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = Partitioning::Range(RangePartitioning::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "col", 0, + )))]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![ + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-1.parquet".to_string(), + 1024, + )]), + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-2.parquet".to_string(), + 1024, + )]), + ]) + .with_output_partitioning(Some(output_partitioning.clone())) + .build(); - assert!(file_scan_config.partitioned_by_file_group); + assert_eq!( + roundtrip_file_scan_config(scan_config)?.output_partitioning, + Some(output_partitioning) + ); Ok(()) } diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index a0c18c90867c7..13493d16c05e5 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -47,7 +47,6 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } -datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs index 88e49708baf60..a3e16eefd881a 100644 --- a/datafusion/sqllogictest/src/test_context/range_partitioning.rs +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -15,236 +15,94 @@ // specific language governing permissions and limitations // under the License. -use std::fmt; +use std::fs::{create_dir_all, remove_dir_all, write}; +use std::path::Path; use std::sync::Arc; -use arrow::array::Int32Array; -use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use datafusion::catalog::Session; -use datafusion::common::{Result, ScalarValue, project_schema}; -use datafusion::datasource::source::{DataSource, DataSourceExec}; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::Expr; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_expr::expressions::col as physical_col; -use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::execution_plan::SchedulingType; -use datafusion::physical_plan::projection::ProjectionExprs; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, - SendableRecordBatchStream, SplitPoint, Statistics, +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::{ScalarValue, SplitPoint}; +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::logical_expr::{Partitioning, RangePartitioning, col}; use datafusion::prelude::SessionContext; -use datafusion_datasource::memory::MemorySourceConfig; // ============================================================================== // Range Partitioned Table (sqllogictest-only) // ============================================================================== -/// Simple range-partitioned table for testing before declaring such tables is -/// supported via SQL. -#[derive(Debug)] -struct RangePartitionedTable { - schema: SchemaRef, - partitions: Vec>, - range_column_index: usize, - split_points: Vec, -} - -#[async_trait] -impl TableProvider for RangePartitionedTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let projected_schema = project_schema(&self.schema, projection)?; - let mut source = MemorySourceConfig::try_new( - &self.partitions, - Arc::clone(&self.schema), - projection.cloned(), - )?; - source = source.with_show_sizes(state.config_options().explain.show_sizes); - - let output_partitioning = - self.output_partitioning(projection, &projected_schema)?; - let source = RangePartitionedSource { - inner: source, - output_partitioning, - }; - - Ok(DataSourceExec::from_data_source(source)) - } -} - -impl RangePartitionedTable { - fn output_partitioning( - &self, - projection: Option<&Vec>, - projected_schema: &SchemaRef, - ) -> Result { - let Some(projected_range_index) = - projected_index(self.range_column_index, projection) - else { - return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); - }; - - let range_column = projected_schema.field(projected_range_index).name(); - let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - physical_col(range_column, projected_schema)?, - SortOptions::default(), - )]) - .expect("range ordering should not be empty"); - - Ok(Partitioning::Range(RangePartitioning::try_new( - ordering, - self.split_points.clone(), - )?)) - } -} - -fn projected_index( - column_index: usize, - projection: Option<&Vec>, -) -> Option { - projection - .map(|projection| projection.iter().position(|idx| *idx == column_index)) - .unwrap_or(Some(column_index)) -} - -#[derive(Clone, Debug)] -struct RangePartitionedSource { - inner: MemorySourceConfig, - output_partitioning: Partitioning, -} - -impl DataSource for RangePartitionedSource { - fn open( - &self, - partition: usize, - context: Arc, - ) -> Result { - self.inner.open(partition, context) - } - - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt_as(t, f)?; - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, ", output_partitioning={}", self.output_partitioning) - } - DisplayFormatType::TreeRender => Ok(()), - } - } - - fn output_partitioning(&self) -> Partitioning { - self.output_partitioning.clone() - } - - fn eq_properties(&self) -> EquivalenceProperties { - self.inner.eq_properties() - } - - fn scheduling_type(&self) -> SchedulingType { - self.inner.scheduling_type() - } - - fn partition_statistics(&self, partition: Option) -> Result> { - self.inner.partition_statistics(partition) - } - - fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(Self { - inner: self.inner.clone().with_limit(limit), - output_partitioning: self.output_partitioning.clone(), - })) - } - - fn fetch(&self) -> Option { - self.inner.fetch() - } - - fn try_swapping_with_projection( - &self, - _projection: &ProjectionExprs, - ) -> Result>> { - // Range partitioning metadata is projection-sensitive. This fixture - // computes it in TableProvider::scan, so do not rewrite later - // ProjectionExec nodes into the source. - Ok(None) - } -} - +/// Registers a simple range-partitioned listing table for testing before +/// declaring such tables is supported via SQL. pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { let schema = Arc::new(Schema::new(vec![ Field::new("range_key", DataType::Int32, false), Field::new("non_range_key", DataType::Int32, false), Field::new("value", DataType::Int32, false), ])); - let partitions = vec![ - vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], - vec![range_partition_batch( - &schema, - &[10, 15], - &[1, 2], - &[100, 150], - )], - vec![range_partition_batch( - &schema, - &[20, 25], - &[1, 2], - &[200, 250], - )], - vec![range_partition_batch( - &schema, - &[30, 35], - &[1, 2], - &[300, 350], - )], - ]; - let split_points = vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), - ]; - let table = RangePartitionedTable { + let output_partitioning = Partitioning::Range( + RangePartitioning::try_new( + vec![col("range_key").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + ) + .expect("range partitioning should be valid"), + ); + + register_csv_listing_table( + ctx, + "range_partitioned", + Path::new(env!("CARGO_MANIFEST_DIR")) + .join("test_files/scratch_range_partitioning/range_partitioned"), schema, - partitions, - range_column_index: 0, - split_points, - }; - - ctx.register_table("range_partitioned", Arc::new(table)) - .expect("range partitioned table registration should succeed"); + [ + "1,1,10\n5,2,50\n", + "10,1,100\n15,2,150\n", + "20,1,200\n25,2,250\n", + "30,1,300\n35,2,350\n", + ], + Some(output_partitioning), + ); } -fn range_partition_batch( - schema: &SchemaRef, - range_key: &[i32], - non_range_key: &[i32], - value: &[i32], -) -> RecordBatch { - RecordBatch::try_new( - Arc::clone(schema), - vec![ - Arc::new(Int32Array::from(range_key.to_vec())), - Arc::new(Int32Array::from(non_range_key.to_vec())), - Arc::new(Int32Array::from(value.to_vec())), - ], - ) - .expect("range partition batch should be valid") +fn register_csv_listing_table( + ctx: &SessionContext, + name: &str, + table_dir: impl AsRef, + schema: Arc, + partitions: impl IntoIterator, + output_partitioning: Option, +) { + let table_dir = table_dir.as_ref(); + if table_dir.exists() { + remove_dir_all(table_dir).expect("test table dir should be removable"); + } + create_dir_all(table_dir).expect("test table dir should be created"); + for (idx, rows) in partitions.into_iter().enumerate() { + write(table_dir.join(format!("part-{idx}.csv")), rows) + .expect("test table csv partition should be written"); + } + + let table_path = format!( + "{}/", + table_dir + .to_str() + .expect("test table path should be valid utf8") + ); + let table_url = + ListingTableUrl::parse(&table_path).expect("test table url should parse"); + let options = + ListingOptions::new(Arc::new(CsvFormat::default().with_has_header(false))) + .with_output_partitioning(output_partitioning); + let config = ListingTableConfig::new(table_url) + .with_listing_options(options) + .with_schema(schema); + let table = + ListingTable::try_new(config).expect("test listing table should be valid"); + + ctx.register_table(name, Arc::new(table)) + .expect("test listing table registration should succeed"); } diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt index a61f17a039eb8..2b7a2cfdf4083 100644 --- a/datafusion/sqllogictest/test_files/range_partitioning.slt +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -16,7 +16,7 @@ # under the License. # The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) -# as an in-memory source with four physical source partitions: +# as a CSV ListingTable with four declared range-partitioned file groups: # # partition 0: range_key in [..., 10), rows (1, 1, 10), (5, 2, 50) # partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) @@ -40,7 +40,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER BY range_key; @@ -69,7 +69,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=UnknownPartitioning(4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[non_range_key, value], output_partitioning=UnknownPartitioning(4), file_type=csv, has_header=false query II SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key ORDER BY non_range_key; @@ -104,8 +104,8 @@ SELECT range_key, value FROM range_partitioned; ---- physical_plan 01)UnionExec -02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) -03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false +03)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, value FROM range_partitioned