From ecd351581799d7795238b3184585555de476a932 Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Sat, 30 May 2026 08:08:12 -0400 Subject: [PATCH 1/6] Add declared file scan output partitioning --- datafusion/catalog-listing/src/options.rs | 25 ++ datafusion/catalog-listing/src/table.rs | 118 ++++++--- .../core/src/datasource/listing/table.rs | 34 ++- .../datasource/src/file_scan_config/mod.rs | 223 ++++++++++++++---- .../proto-models/proto/datafusion.proto | 1 + .../proto-models/src/generated/pbjson.rs | 18 ++ .../proto-models/src/generated/prost.rs | 2 + .../proto/src/physical_plan/from_proto.rs | 17 +- .../proto/src/physical_plan/to_proto.rs | 6 + .../tests/cases/roundtrip_physical_plan.rs | 99 ++++++-- 10 files changed, 430 insertions(+), 113 deletions(-) diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 55840eb0e3122..930ff47a86db8 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -21,6 +21,7 @@ use datafusion_common::plan_err; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_format::FileFormat; use datafusion_expr::SortExpr; +use datafusion_physical_expr::Partitioning; use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; @@ -53,6 +54,17 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Optional declared output partitioning for this table. + /// + /// Expressions are specified against the full table schema. When set, + /// [`ListingTable`](crate::ListingTable) creates one scan file group per + /// declared output partition instead of the scan-time target partition + /// count. Empty file groups are added when needed to preserve that count. + /// + /// Files are sorted by path before grouping. DataFusion does not validate + /// that rows match the declaration, so callers must ensure file group `i` + /// contains only rows for declared output partition `i`. + pub output_partitioning: Option, } impl ListingOptions { @@ -66,6 +78,7 @@ impl ListingOptions { format, table_partition_cols: vec![], file_sort_order: vec![], + output_partitioning: None, } } @@ -113,6 +126,18 @@ impl ListingOptions { self } + /// Set declared output partitioning on [`ListingOptions`] and returns self. + /// + /// See [`Self::output_partitioning`]. Empty file groups are added when + /// needed to preserve the declared partition count. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set `table partition columns` on [`ListingOptions`] and returns self. /// /// "partition columns," used to support [Hive Partitioning], are diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 04feec2b6e437..bc63c09c4bd44 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -42,6 +42,7 @@ use datafusion_physical_expr::create_lex_ordering; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::Partitioning; use datafusion_physical_plan::empty::EmptyExec; use futures::{Stream, StreamExt, TryStreamExt, future, stream}; use object_store::ObjectStore; @@ -519,12 +520,24 @@ impl TableProvider for ListingTable { // at the same time. This is because the limit should be applied after the filters are applied. let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let declared_output_partitioning = self.options.output_partitioning.clone(); + let target_partitions = declared_output_partitioning + .as_ref() + .map(Partitioning::partition_count) + .unwrap_or_else(|| state.config().target_partitions()); + let ListFilesResult { file_groups: mut partitioned_file_lists, statistics, grouped_by_partition: partitioned_by_file_group, } = self - .list_files_for_scan(state, &partition_filters, statistic_file_limit) + .list_files_for_scan_with_target( + state, + &partition_filters, + statistic_file_limit, + target_partitions, + declared_output_partitioning.is_some(), + ) .await?; // if no files need to be read, return an `EmptyExec` @@ -537,17 +550,19 @@ impl TableProvider for ListingTable { state.execution_props(), &partitioned_file_lists, )?; - match state - .config_options() - .execution - .split_file_groups_by_statistics + let split_file_groups_by_statistics = declared_output_partitioning.is_none() + && state + .config_options() + .execution + .split_file_groups_by_statistics; + match split_file_groups_by_statistics .then(|| { output_ordering.first().map(|output_ordering| { FileScanConfig::split_groups_by_statistics_with_target_partitions( &self.table_schema, &partitioned_file_lists, output_ordering, - state.config().target_partitions(), + target_partitions, ) }) }) @@ -555,7 +570,7 @@ impl TableProvider for ListingTable { { Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), Some(Ok(new_groups)) => { - if new_groups.len() <= state.config().target_partitions() { + if new_groups.len() <= target_partitions { partitioned_file_lists = new_groups; } else { log::debug!( @@ -575,24 +590,27 @@ impl TableProvider for ListingTable { }; let file_source = self.create_file_source(); + let mut scan_config_builder = + FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) + .with_statistics(statistics) + .with_projection_indices(projection)? + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_output_partitioning(declared_output_partitioning) + .with_expr_adapter(self.expr_adapter_factory.clone()); + if partitioned_by_file_group { + scan_config_builder = + scan_config_builder.with_partitioned_by_file_group(true); + } + let scan_config = scan_config_builder.build(); // create the execution plan let plan = self .options .format - .create_physical_plan( - state, - FileScanConfigBuilder::new(object_store_url, file_source) - .with_file_groups(partitioned_file_lists) - .with_constraints(self.constraints.clone()) - .with_statistics(statistics) - .with_projection_indices(projection)? - .with_limit(limit) - .with_output_ordering(output_ordering) - .with_expr_adapter(self.expr_adapter_factory.clone()) - .with_partitioned_by_file_group(partitioned_by_file_group) - .build(), - ) + .create_physical_plan(state, scan_config) .await?; Ok(ScanResult::new(plan)) @@ -704,11 +722,38 @@ impl ListingTable { /// Get the list of files for a scan as well as the file level statistics. /// The list is grouped to let the execution plan know how the files should /// be distributed to different threads / executors. + /// + /// If [`ListingOptions::output_partitioning`] is set, the returned file + /// groups preserve that declared partition count, including empty trailing + /// groups when needed, rather than using the scan-time target partition + /// count. pub async fn list_files_for_scan<'a>( &'a self, ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, + ) -> datafusion_common::Result { + let declared_output_partitioning = self.options.output_partitioning.as_ref(); + let target_partitions = declared_output_partitioning + .map(Partitioning::partition_count) + .unwrap_or_else(|| ctx.config().target_partitions()); + self.list_files_for_scan_with_target( + ctx, + filters, + limit, + target_partitions, + declared_output_partitioning.is_some(), + ) + .await + } + + async fn list_files_for_scan_with_target<'a>( + &'a self, + ctx: &'a dyn Session, + filters: &'a [Expr], + limit: Option, + target_partitions: usize, + preserve_partition_count: bool, ) -> datafusion_common::Result { let store = if let Some(url) = self.table_paths.first() { ctx.runtime_env().object_store(url)? @@ -761,27 +806,26 @@ impl ListingTable { // hash repartitioning for aggregates and joins on partition columns. let threshold = ctx.config_options().optimizer.preserve_file_partitions; - let (file_groups, grouped_by_partition) = - if threshold > 0 && !self.options.table_partition_cols.is_empty() { - let grouped = file_group - .group_by_partition_values(ctx.config().target_partitions()); - if grouped.len() >= threshold { - (grouped, true) - } else { - let all_files: Vec<_> = - grouped.into_iter().flat_map(|g| g.into_inner()).collect(); - ( - FileGroup::new(all_files) - .split_files(ctx.config().target_partitions()), - false, - ) - } + let (mut file_groups, grouped_by_partition) = if preserve_partition_count { + (file_group.split_files(target_partitions), false) + } else if threshold > 0 && !self.options.table_partition_cols.is_empty() { + let grouped = file_group.group_by_partition_values(target_partitions); + if grouped.len() >= threshold { + (grouped, true) } else { + let all_files: Vec<_> = + grouped.into_iter().flat_map(|g| g.into_inner()).collect(); ( - file_group.split_files(ctx.config().target_partitions()), + FileGroup::new(all_files).split_files(target_partitions), false, ) - }; + } + } else { + (file_group.split_files(target_partitions), false) + }; + if preserve_partition_count && !file_groups.is_empty() { + file_groups.resize_with(target_partitions, || FileGroup::new(vec![])); + } let (file_groups, stats) = compute_all_files_statistics( file_groups, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 50b3855a0ab7c..404d53cf563a5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -143,7 +143,7 @@ mod tests { use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::statistics::StatisticsArgs; - use datafusion_physical_plan::{ExecutionPlanProperties, collect}; + use datafusion_physical_plan::{ExecutionPlanProperties, Partitioning, collect}; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -1289,6 +1289,38 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_list_files_uses_declared_output_partitioning_count() -> Result<()> { + let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_target_partitions(1) + .with_output_partitioning(Some(Partitioning::RoundRobinBatch(4))); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let group_sizes = result + .file_groups + .iter() + .map(|group| group.len()) + .collect::>(); + + assert_eq!(group_sizes, vec![1, 1, 0, 0]); + + Ok(()) + } + #[tokio::test] async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { let files = [ diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index b1ba0584c96a0..536df3ac49720 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -40,7 +40,7 @@ use datafusion_expr::Operator; use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExprs, ProjectionMapping}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -206,6 +206,13 @@ pub struct FileScanConfig { /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, + /// Optional declared output partitioning of this file scan. + /// + /// Expressions are in terms of the full table schema, before scan + /// projection or filtering. If the partition count does not match the + /// number of file groups, [`DataSource::output_partitioning`] falls back to + /// [`Partitioning::UnknownPartitioning`]. + pub output_partitioning: Option, } /// A builder for [`FileScanConfig`]'s. @@ -274,6 +281,7 @@ pub struct FileScanConfigBuilder { file_groups: Vec, statistics: Option, output_ordering: Vec, + output_partitioning: Option, file_compression_type: Option, batch_size: Option, expr_adapter_factory: Option>, @@ -297,6 +305,7 @@ impl FileScanConfigBuilder { file_groups: vec![], statistics: None, output_ordering: vec![], + output_partitioning: None, file_compression_type: None, limit: None, preserve_order: false, @@ -463,6 +472,17 @@ impl FileScanConfigBuilder { self } + /// Set declared output partitioning for this scan. + /// + /// See [`FileScanConfig::output_partitioning`]. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set the file compression type pub fn with_file_compression_type( mut self, @@ -521,6 +541,7 @@ impl FileScanConfigBuilder { file_groups, statistics, output_ordering, + output_partitioning, file_compression_type, batch_size, expr_adapter_factory: expr_adapter, @@ -550,6 +571,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + output_partitioning, } } } @@ -562,6 +584,7 @@ impl From for FileScanConfigBuilder { file_groups: config.file_groups, statistics: Some(config.statistics), output_ordering: config.output_ordering, + output_partitioning: config.output_partitioning, file_compression_type: Some(config.file_compression_type), limit: config.limit, preserve_order: config.preserve_order, @@ -573,6 +596,52 @@ impl From for FileScanConfigBuilder { } } +fn hash_partitioning_from_partition_fields( + schema: &Schema, + partition_cols: &Fields, + partition_count: usize, +) -> Option { + if partition_cols.is_empty() { + return None; + } + + let mut exprs: Vec> = Vec::with_capacity(partition_cols.len()); + for partition_col in partition_cols { + let name = partition_col.name(); + let idx = schema + .fields() + .iter() + .position(|field| field.name() == name)?; + exprs.push(Arc::new(Column::new(name, idx))); + } + + Some(Partitioning::Hash(exprs, partition_count)) +} + +fn project_output_partitioning( + partitioning: &Partitioning, + mapping: &ProjectionMapping, + input_schema: &SchemaRef, + partition_count: usize, +) -> Partitioning { + let input_eq_properties = EquivalenceProperties::new(Arc::clone(input_schema)); + match partitioning { + Partitioning::Hash(exprs, _) => { + let projected_exprs = input_eq_properties + .project_expressions(exprs, mapping) + .collect::>>(); + projected_exprs + .map(|exprs| Partitioning::Hash(exprs, partition_count)) + .unwrap_or_else(|| Partitioning::UnknownPartitioning(partition_count)) + } + Partitioning::Range(_) + | Partitioning::RoundRobinBatch(_) + | Partitioning::UnknownPartitioning(_) => { + partitioning.project(mapping, &input_eq_properties) + } + } +} + impl DataSource for FileScanConfig { fn open( &self, @@ -660,6 +729,10 @@ impl DataSource for FileScanConfig { display_orderings(f, &orderings)?; + if self.output_partitioning.is_some() { + write!(f, ", output_partitioning={}", self.output_partitioning())?; + } + if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; } @@ -683,10 +756,9 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - // When files are grouped by partition values, we cannot allow byte-range - // splitting. It would mix rows from different partition values across - // file groups, breaking the Hash partitioning. - if self.partitioned_by_file_group { + // When file groups define output partitioning, repartitioning files + // would invalidate the partition-to-file-group mapping. + if self.output_partitioning.is_some() || self.partitioned_by_file_group { return Ok(None); } @@ -702,54 +774,60 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// - /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on - /// the Hive partition columns, allowing the optimizer to skip hash repartitioning - /// for aggregates and joins on those columns. + /// When output partitioning is declared, this returns it after remapping + /// through the scan projection. Otherwise, when `partitioned_by_file_group` + /// is true, this returns `Partitioning::Hash` on the Hive partition + /// columns, allowing the optimizer to skip repartitioning for compatible + /// aggregates and joins. /// /// Tradeoffs - /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with - /// `GROUP BY` or `ORDER BY` on partition columns. - /// - Cost: Files are grouped by partition values rather than split by byte - /// ranges, which may reduce I/O parallelism when partition sizes are uneven. - /// For simple aggregations without `ORDER BY`, this cost may outweigh the benefit. + /// - Benefit: Eliminates `RepartitionExec` for compatible queries. + /// - Cost: File groups must remain intact, so byte-range file splitting + /// and sibling work stealing are disabled. /// /// Follow-up Work - /// - Idea: Could allow byte-range splitting within partition-aware groups, + /// - Idea: Could allow byte-range splitting within each output partition, /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { - if self.partitioned_by_file_group { - let partition_cols = self.table_partition_cols(); - if !partition_cols.is_empty() { - let projected_schema = match self.projected_schema() { - Ok(schema) => schema, - Err(_) => { - debug!( - "Could not get projected schema, falling back to UnknownPartitioning." - ); - return Partitioning::UnknownPartitioning(self.file_groups.len()); - } - }; - - // Build Column expressions for partition columns based on their - // position in the projected schema - let mut exprs: Vec> = Vec::new(); - for partition_col in partition_cols { - if let Some((idx, _)) = projected_schema - .fields() - .iter() - .enumerate() - .find(|(_, f)| f.name() == partition_col.name()) - { - exprs.push(Arc::new(Column::new(partition_col.name(), idx))); - } - } + let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { + self.partitioned_by_file_group.then(|| { + hash_partitioning_from_partition_fields( + self.file_source.table_schema().table_schema(), + self.table_partition_cols(), + self.file_groups.len(), + ) + })? + }) else { + return Partitioning::UnknownPartitioning(self.file_groups.len()); + }; + if output_partitioning.partition_count() != self.file_groups.len() { + debug!( + "Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.", + output_partitioning.partition_count(), + self.file_groups.len() + ); + return Partitioning::UnknownPartitioning(self.file_groups.len()); + } - if exprs.len() == partition_cols.len() { - return Partitioning::Hash(exprs, self.file_groups.len()); + if let Some(projection) = self.file_source.projection() { + let schema = self.file_source.table_schema().table_schema(); + return match projection.projection_mapping(schema) { + Ok(mapping) => project_output_partitioning( + &output_partitioning, + &mapping, + schema, + self.file_groups.len(), + ), + Err(e) => { + debug!( + "Could not project output partitioning, falling back to UnknownPartitioning: {e}" + ); + Partitioning::UnknownPartitioning(self.file_groups.len()) } - } + }; } - Partitioning::UnknownPartitioning(self.file_groups.len()) + + output_partitioning } /// Computes the effective equivalence properties of this file scan, taking @@ -1043,7 +1121,10 @@ impl DataSource for FileScanConfig { /// when file order must be preserved or the file groups define the output /// partitioning needed for the rest of the plan fn create_sibling_state(&self) -> Option> { - if self.preserve_order || self.partitioned_by_file_group { + if self.preserve_order + || self.output_partitioning.is_some() + || self.partitioned_by_file_group + { return None; } @@ -2459,6 +2540,58 @@ mod tests { assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); } + #[test] + fn test_declared_output_partitioning_projects_with_scan() { + let file_schema = aggr_test_schema(); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4); + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![1, 2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = Some(output_partitioning); + + match config.output_partitioning() { + Partitioning::Hash(exprs, num_partitions) => { + assert_eq!(num_partitions, 4); + assert_eq!(exprs.len(), 1); + let column = exprs[0].downcast_ref::().unwrap(); + assert_eq!(column.name(), "c2"); + assert_eq!(column.index(), 0); + } + _ => panic!("Expected Hash partitioning"), + } + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = + Some(Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4)); + + assert!(matches!( + config.output_partitioning(), + Partitioning::UnknownPartitioning(4) + )); + } + #[test] fn test_output_partitioning_no_partition_columns() { let file_schema = aggr_test_schema(); diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index 2f5b75e40937e..a3d18e4050456 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1181,6 +1181,7 @@ message FileScanExecConf { optional ProjectionExprs projection_exprs = 13; optional bool partitioned_by_file_group = 14; + optional Partitioning output_partitioning = 15; } message ParquetScanExecNode { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 733da68fe89c2..0a71b4513801f 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -6966,6 +6966,9 @@ impl serde::Serialize for FileScanExecConf { if self.partitioned_by_file_group.is_some() { len += 1; } + if self.output_partitioning.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -7005,6 +7008,9 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.partitioned_by_file_group.as_ref() { struct_ser.serialize_field("partitionedByFileGroup", v)?; } + if let Some(v) = self.output_partitioning.as_ref() { + struct_ser.serialize_field("outputPartitioning", v)?; + } struct_ser.end() } } @@ -7034,6 +7040,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "projectionExprs", "partitioned_by_file_group", "partitionedByFileGroup", + "output_partitioning", + "outputPartitioning", ]; #[allow(clippy::enum_variant_names)] @@ -7050,6 +7058,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { BatchSize, ProjectionExprs, PartitionedByFileGroup, + OutputPartitioning, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7083,6 +7092,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), "projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs), "partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup), + "outputPartitioning" | "output_partitioning" => Ok(GeneratedField::OutputPartitioning), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7114,6 +7124,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut batch_size__ = None; let mut projection_exprs__ = None; let mut partitioned_by_file_group__ = None; + let mut output_partitioning__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -7193,6 +7204,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } partitioned_by_file_group__ = map_.next_value()?; } + GeneratedField::OutputPartitioning => { + if output_partitioning__.is_some() { + return Err(serde::de::Error::duplicate_field("outputPartitioning")); + } + output_partitioning__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -7208,6 +7225,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { batch_size: batch_size__, projection_exprs: projection_exprs__, partitioned_by_file_group: partitioned_by_file_group__, + output_partitioning: output_partitioning__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index 4a2edeeb11eca..fac08e6b81bed 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -1802,6 +1802,8 @@ pub struct FileScanExecConf { pub projection_exprs: ::core::option::Option, #[prost(bool, optional, tag = "14")] pub partitioned_by_file_group: ::core::option::Option, + #[prost(message, optional, tag = "15")] + pub output_partitioning: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 7bd1a3dd66d01..53ff4a41d466e 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -552,6 +552,12 @@ pub fn parse_protobuf_file_scan_config( )?; output_ordering.extend(LexOrdering::new(sort_exprs)); } + let output_partitioning = parse_protobuf_partitioning( + proto.output_partitioning.as_ref(), + ctx, + &schema, + proto_converter, + )?; // Parse projection expressions if present and apply to file source let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs { @@ -580,15 +586,18 @@ pub fn parse_protobuf_file_scan_config( file_source }; - let config = FileScanConfigBuilder::new(object_store_url, file_source) + let mut config_builder = FileScanConfigBuilder::new(object_store_url, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) - .with_batch_size(proto.batch_size.map(|s| s as usize)) - .with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false)) - .build(); + .with_output_partitioning(output_partitioning) + .with_batch_size(proto.batch_size.map(|s| s as usize)); + if proto.partitioned_by_file_group.unwrap_or(false) { + config_builder = config_builder.with_partitioned_by_file_group(true); + } + let config = config_builder.build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7310c0928eee4..4614c4f002169 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -498,6 +498,11 @@ pub fn serialize_file_scan_config( serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?; output_orderings.push(ordering) } + let output_partitioning = conf + .output_partitioning + .as_ref() + .map(|partitioning| serialize_partitioning(partitioning, codec, proto_converter)) + .transpose()?; // Fields must be added to the schema so that they can persist in the protobuf, // and then they are to be removed from the schema in `parse_protobuf_file_scan_config` @@ -558,6 +563,7 @@ pub fn serialize_file_scan_config( batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, partitioned_by_file_group: Some(conf.partitioned_by_file_group), + output_partitioning, }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8e80467788598..2022857d4e59d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4080,10 +4080,26 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { Ok(()) } +fn roundtrip_file_scan_config(scan_config: FileScanConfig) -> Result { + let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let result_plan = + roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?; + + let data_source_exec = result_plan + .downcast_ref::() + .expect("Expected DataSourceExec"); + let file_scan_config = data_source_exec + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + Ok(file_scan_config.clone()) +} + #[test] fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { - use datafusion::datasource::physical_plan::FileScanConfig; - let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); @@ -4096,34 +4112,65 @@ fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { .with_partitioned_by_file_group(true) .build(); - assert!(scan_config.partitioned_by_file_group); + assert!(roundtrip_file_scan_config(scan_config)?.partitioned_by_file_group); + Ok(()) +} - let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); +#[test] +fn roundtrip_parquet_exec_output_partitioning() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("col", 0))], 1); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .with_output_partitioning(Some(output_partitioning.clone())) + .build(); - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DefaultPhysicalProtoConverter {}; - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan), - &codec, - &proto_converter, - )?; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; + assert_eq!( + roundtrip_file_scan_config(scan_config)?.output_partitioning, + Some(output_partitioning) + ); - let data_source_exec = result_plan - .downcast_ref::() - .expect("Expected DataSourceExec"); - let file_scan_config = data_source_exec - .data_source() - .downcast_ref::() - .expect("Expected FileScanConfig"); + Ok(()) +} + +#[test] +fn roundtrip_parquet_exec_range_output_partitioning() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = Partitioning::Range(RangePartitioning::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "col", 0, + )))]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![ + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-1.parquet".to_string(), + 1024, + )]), + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-2.parquet".to_string(), + 1024, + )]), + ]) + .with_output_partitioning(Some(output_partitioning.clone())) + .build(); - assert!(file_scan_config.partitioned_by_file_group); + assert_eq!( + roundtrip_file_scan_config(scan_config)?.output_partitioning, + Some(output_partitioning) + ); Ok(()) } From 296da5994006e7a603bd7127226ec7cbbe30e1cc Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Sat, 30 May 2026 09:06:01 -0400 Subject: [PATCH 2/6] Use ListingTable for range partitioning sqllogictest --- .../src/test_context/range_partitioning.rs | 292 +++++------------- .../test_files/range_partitioning.slt | 10 +- 2 files changed, 84 insertions(+), 218 deletions(-) diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs index 88e49708baf60..55e4f09663d90 100644 --- a/datafusion/sqllogictest/src/test_context/range_partitioning.rs +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -15,236 +15,102 @@ // specific language governing permissions and limitations // under the License. -use std::fmt; +use std::fs::{create_dir_all, remove_dir_all, write}; +use std::path::Path; use std::sync::Arc; -use arrow::array::Int32Array; use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use datafusion::catalog::Session; -use datafusion::common::{Result, ScalarValue, project_schema}; -use datafusion::datasource::source::{DataSource, DataSourceExec}; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::Expr; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_expr::expressions::col as physical_col; -use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::execution_plan::SchedulingType; -use datafusion::physical_plan::projection::ProjectionExprs; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, - SendableRecordBatchStream, SplitPoint, Statistics, +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::ScalarValue; +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::{Partitioning, RangePartitioning, SplitPoint}; use datafusion::prelude::SessionContext; -use datafusion_datasource::memory::MemorySourceConfig; // ============================================================================== // Range Partitioned Table (sqllogictest-only) // ============================================================================== -/// Simple range-partitioned table for testing before declaring such tables is -/// supported via SQL. -#[derive(Debug)] -struct RangePartitionedTable { - schema: SchemaRef, - partitions: Vec>, - range_column_index: usize, - split_points: Vec, -} - -#[async_trait] -impl TableProvider for RangePartitionedTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let projected_schema = project_schema(&self.schema, projection)?; - let mut source = MemorySourceConfig::try_new( - &self.partitions, - Arc::clone(&self.schema), - projection.cloned(), - )?; - source = source.with_show_sizes(state.config_options().explain.show_sizes); - - let output_partitioning = - self.output_partitioning(projection, &projected_schema)?; - let source = RangePartitionedSource { - inner: source, - output_partitioning, - }; - - Ok(DataSourceExec::from_data_source(source)) - } -} - -impl RangePartitionedTable { - fn output_partitioning( - &self, - projection: Option<&Vec>, - projected_schema: &SchemaRef, - ) -> Result { - let Some(projected_range_index) = - projected_index(self.range_column_index, projection) - else { - return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); - }; - - let range_column = projected_schema.field(projected_range_index).name(); - let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - physical_col(range_column, projected_schema)?, - SortOptions::default(), - )]) - .expect("range ordering should not be empty"); - - Ok(Partitioning::Range(RangePartitioning::try_new( - ordering, - self.split_points.clone(), - )?)) - } -} - -fn projected_index( - column_index: usize, - projection: Option<&Vec>, -) -> Option { - projection - .map(|projection| projection.iter().position(|idx| *idx == column_index)) - .unwrap_or(Some(column_index)) -} - -#[derive(Clone, Debug)] -struct RangePartitionedSource { - inner: MemorySourceConfig, - output_partitioning: Partitioning, -} - -impl DataSource for RangePartitionedSource { - fn open( - &self, - partition: usize, - context: Arc, - ) -> Result { - self.inner.open(partition, context) - } - - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt_as(t, f)?; - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, ", output_partitioning={}", self.output_partitioning) - } - DisplayFormatType::TreeRender => Ok(()), - } - } - - fn output_partitioning(&self) -> Partitioning { - self.output_partitioning.clone() - } - - fn eq_properties(&self) -> EquivalenceProperties { - self.inner.eq_properties() - } - - fn scheduling_type(&self) -> SchedulingType { - self.inner.scheduling_type() - } - - fn partition_statistics(&self, partition: Option) -> Result> { - self.inner.partition_statistics(partition) - } - - fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(Self { - inner: self.inner.clone().with_limit(limit), - output_partitioning: self.output_partitioning.clone(), - })) - } - - fn fetch(&self) -> Option { - self.inner.fetch() - } - - fn try_swapping_with_projection( - &self, - _projection: &ProjectionExprs, - ) -> Result>> { - // Range partitioning metadata is projection-sensitive. This fixture - // computes it in TableProvider::scan, so do not rewrite later - // ProjectionExec nodes into the source. - Ok(None) - } -} - +/// Registers a simple range-partitioned listing table for testing before +/// declaring such tables is supported via SQL. pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { let schema = Arc::new(Schema::new(vec![ Field::new("range_key", DataType::Int32, false), Field::new("non_range_key", DataType::Int32, false), Field::new("value", DataType::Int32, false), ])); - let partitions = vec![ - vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], - vec![range_partition_batch( - &schema, - &[10, 15], - &[1, 2], - &[100, 150], - )], - vec![range_partition_batch( - &schema, - &[20, 25], - &[1, 2], - &[200, 250], - )], - vec![range_partition_batch( - &schema, - &[30, 35], - &[1, 2], - &[300, 350], - )], - ]; - let split_points = vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), - ]; - let table = RangePartitionedTable { + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("range_key", 0)), + SortOptions::default(), + )]) + .expect("range ordering should not be empty"); + let output_partitioning = Partitioning::Range( + RangePartitioning::try_new( + ordering, + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + ) + .expect("range partitioning should be valid"), + ); + + register_csv_listing_table( + ctx, + "range_partitioned", + Path::new(env!("CARGO_MANIFEST_DIR")) + .join("test_files/scratch_range_partitioning/range_partitioned"), schema, - partitions, - range_column_index: 0, - split_points, - }; - - ctx.register_table("range_partitioned", Arc::new(table)) - .expect("range partitioned table registration should succeed"); + [ + "1,1,10\n5,2,50\n", + "10,1,100\n15,2,150\n", + "20,1,200\n25,2,250\n", + "30,1,300\n35,2,350\n", + ], + Some(output_partitioning), + ); } -fn range_partition_batch( - schema: &SchemaRef, - range_key: &[i32], - non_range_key: &[i32], - value: &[i32], -) -> RecordBatch { - RecordBatch::try_new( - Arc::clone(schema), - vec![ - Arc::new(Int32Array::from(range_key.to_vec())), - Arc::new(Int32Array::from(non_range_key.to_vec())), - Arc::new(Int32Array::from(value.to_vec())), - ], - ) - .expect("range partition batch should be valid") +fn register_csv_listing_table( + ctx: &SessionContext, + name: &str, + table_dir: impl AsRef, + schema: Arc, + partitions: impl IntoIterator, + output_partitioning: Option, +) { + let table_dir = table_dir.as_ref(); + if table_dir.exists() { + remove_dir_all(table_dir).expect("test table dir should be removable"); + } + create_dir_all(table_dir).expect("test table dir should be created"); + for (idx, rows) in partitions.into_iter().enumerate() { + write(table_dir.join(format!("part-{idx}.csv")), rows) + .expect("test table csv partition should be written"); + } + + let table_path = format!( + "{}/", + table_dir + .to_str() + .expect("test table path should be valid utf8") + ); + let table_url = + ListingTableUrl::parse(&table_path).expect("test table url should parse"); + let options = + ListingOptions::new(Arc::new(CsvFormat::default().with_has_header(false))) + .with_output_partitioning(output_partitioning); + let config = ListingTableConfig::new(table_url) + .with_listing_options(options) + .with_schema(schema); + let table = + ListingTable::try_new(config).expect("test listing table should be valid"); + + ctx.register_table(name, Arc::new(table)) + .expect("test listing table registration should succeed"); } diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt index a61f17a039eb8..2b7a2cfdf4083 100644 --- a/datafusion/sqllogictest/test_files/range_partitioning.slt +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -16,7 +16,7 @@ # under the License. # The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) -# as an in-memory source with four physical source partitions: +# as a CSV ListingTable with four declared range-partitioned file groups: # # partition 0: range_key in [..., 10), rows (1, 1, 10), (5, 2, 50) # partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) @@ -40,7 +40,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER BY range_key; @@ -69,7 +69,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=UnknownPartitioning(4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[non_range_key, value], output_partitioning=UnknownPartitioning(4), file_type=csv, has_header=false query II SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key ORDER BY non_range_key; @@ -104,8 +104,8 @@ SELECT range_key, value FROM range_partitioned; ---- physical_plan 01)UnionExec -02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) -03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false +03)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, value FROM range_partitioned From 72f66035396f322de836b98511898b1cfce56879 Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Sat, 30 May 2026 16:51:28 -0400 Subject: [PATCH 3/6] Handle pruned declared file partitioning --- Cargo.lock | 1 - datafusion/catalog-listing/src/table.rs | 15 +++++- .../core/src/datasource/listing/table.rs | 48 ++++++++++++++++++- datafusion/sqllogictest/Cargo.toml | 1 - 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2b729677d76d..4bd37c6871cd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2632,7 +2632,6 @@ dependencies = [ "chrono", "clap", "datafusion", - "datafusion-datasource", "datafusion-spark", "datafusion-substrait", "env_logger", diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index bc63c09c4bd44..9794617b66ef5 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -520,7 +520,14 @@ impl TableProvider for ListingTable { // at the same time. This is because the limit should be applied after the filters are applied. let statistic_file_limit = if filters.is_empty() { limit } else { None }; - let declared_output_partitioning = self.options.output_partitioning.clone(); + let declared_output_partitioning = if partition_filters.is_empty() { + self.options.output_partitioning.clone() + } else { + // Partition pruning can remove files before grouping. Without a + // stable file-to-declared-partition mapping, regrouping the + // remaining files could shift them into the wrong partition index. + None + }; let target_partitions = declared_output_partitioning .as_ref() .map(Partitioning::partition_count) @@ -755,6 +762,12 @@ impl ListingTable { target_partitions: usize, preserve_partition_count: bool, ) -> datafusion_common::Result { + if target_partitions == 0 { + return plan_err!( + "ListingTable requires target_partitions to be greater than zero" + ); + } + let store = if let Some(url) = self.table_paths.first() { ctx.runtime_env().object_store(url)? } else { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 404d53cf563a5..9d52e5447d64d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -139,11 +139,13 @@ mod tests { use datafusion_expr::dml::InsertOp; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::PhysicalSortExpr; - use datafusion_physical_expr::expressions::binary; + use datafusion_physical_expr::expressions::{Column, binary}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::statistics::StatisticsArgs; - use datafusion_physical_plan::{ExecutionPlanProperties, Partitioning, collect}; + use datafusion_physical_plan::{ + ExecutionPlanProperties, Partitioning, RangePartitioning, SplitPoint, collect, + }; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -1321,6 +1323,48 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_partition_filter_drops_declared_output_partitioning() -> Result<()> { + let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("pid", 1)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]) + .with_output_partitioning(Some(output_partitioning.clone())); + + let table_path = ListingTableUrl::parse("test:///bucket/test/")?; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let unfiltered = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!(unfiltered.output_partitioning(), &output_partitioning); + + let filter = Expr::eq(col("pid"), lit(2_i32)); + let filtered = table.scan(&ctx.state(), None, &[filter], None).await?; + assert!(matches!( + filtered.output_partitioning(), + Partitioning::UnknownPartitioning(1) + )); + + Ok(()) + } + #[tokio::test] async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { let files = [ diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index a0c18c90867c7..13493d16c05e5 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -47,7 +47,6 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } -datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } From 54b598dab10358fff388c5ee599b071f8468abee Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Thu, 4 Jun 2026 09:49:02 -0400 Subject: [PATCH 4/6] Use logical output partitioning declarations --- Cargo.lock | 1 + datafusion/catalog-listing/Cargo.toml | 1 + datafusion/catalog-listing/src/helpers.rs | 4 +- datafusion/catalog-listing/src/options.rs | 20 +- datafusion/catalog-listing/src/table.rs | 292 +++++++++++++----- .../core/src/datasource/listing/table.rs | 179 ++++++++++- .../datasource/src/file_scan_config/mod.rs | 17 +- .../src/test_context/range_partitioning.rs | 14 +- 8 files changed, 407 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4bd37c6871cd1..a4f20417cdb41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1819,6 +1819,7 @@ dependencies = [ "datafusion-datasource-parquet", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 61b55397137df..417bc0d0ac710 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -38,6 +38,7 @@ datafusion-common = { workspace = true, features = ["object_store"] } datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 4f83ec4b3730f..796fca372b5bb 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -325,7 +325,7 @@ pub fn evaluate_partition_prefix<'a>( } } -fn filter_partitions( +pub fn filter_partitioned_file( pf: PartitionedFile, filters: &[Expr], df_schema: &DFSchema, @@ -447,7 +447,7 @@ pub async fn pruned_partition_list<'a>( )) }) .try_filter_map(move |pf| { - futures::future::ready(filter_partitions(pf, filters, &df_schema)) + futures::future::ready(filter_partitioned_file(pf, filters, &df_schema)) }) .boxed()) } diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 930ff47a86db8..4f2b286a521d1 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -20,8 +20,7 @@ use datafusion_catalog::Session; use datafusion_common::plan_err; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_format::FileFormat; -use datafusion_expr::SortExpr; -use datafusion_physical_expr::Partitioning; +use datafusion_expr::{Partitioning, SortExpr}; use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; @@ -56,10 +55,16 @@ pub struct ListingOptions { pub file_sort_order: Vec>, /// Optional declared output partitioning for this table. /// - /// Expressions are specified against the full table schema. When set, - /// [`ListingTable`](crate::ListingTable) creates one scan file group per - /// declared output partition instead of the scan-time target partition - /// count. Empty file groups are added when needed to preserve that count. + /// This source declaration supports hash and range partitioning. + /// Expressions are logical expressions against the full table schema. This + /// declaration is authoritative: when set, [`ListingTable`](crate::ListingTable) + /// creates one scan file group per declared output partition instead of the + /// scan-time target partition count. Empty file groups are added when needed + /// to preserve that count. + /// + /// For range partitioning, split point values are validated against the + /// ordering expression types when planning the scan. Value-preserving casts + /// are accepted, but incompatible or lossy split point values are rejected. /// /// Files are sorted by path before grouping. DataFusion does not validate /// that rows match the declaration, so callers must ensure file group `i` @@ -129,7 +134,8 @@ impl ListingOptions { /// Set declared output partitioning on [`ListingOptions`] and returns self. /// /// See [`Self::output_partitioning`]. Empty file groups are added when - /// needed to preserve the declared partition count. + /// needed to preserve the declared partition count. Range split point values + /// are validated against the table schema when planning the scan. pub fn with_output_partitioning( mut self, output_partitioning: Option, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9794617b66ef5..c8bc8edaad131 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -16,14 +16,17 @@ // under the License. use crate::config::SchemaSource; -use crate::helpers::{expr_applicable_for_cols, pruned_partition_list}; +use crate::helpers::{ + expr_applicable_for_cols, filter_partitioned_file, pruned_partition_list, +}; use crate::{ListingOptions, ListingTableConfig}; -use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema, + Constraints, DFSchema, ScalarValue, SchemaExt, SplitPoint, Statistics, + internal_datafusion_err, plan_err, project_schema, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; @@ -37,12 +40,17 @@ use datafusion_datasource::{ use datafusion_execution::cache::cache_manager::{FileStatisticsCache, TableScopedPath}; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::create_lex_ordering; +use datafusion_expr::{ + Expr, Partitioning as LogicalPartitioning, TableProviderFilterPushDown, TableType, +}; +use datafusion_expr_common::casts::try_cast_literal_to_type; +use datafusion_physical_expr::{ + Partitioning as PhysicalPartitioning, RangePartitioning as PhysicalRangePartitioning, + create_lex_ordering, create_physical_expr, create_physical_sort_expr, +}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; -use datafusion_physical_plan::Partitioning; use datafusion_physical_plan::empty::EmptyExec; use futures::{Stream, StreamExt, TryStreamExt, future, stream}; use object_store::ObjectStore; @@ -449,6 +457,112 @@ fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option datafusion_common::Result { + let df_schema = DFSchema::try_from(Arc::clone(schema))?; + match partitioning { + LogicalPartitioning::RoundRobinBatch(_) => { + datafusion_common::not_impl_err!( + "RoundRobinBatch output partitioning is not supported for ListingTable" + ) + } + LogicalPartitioning::Hash(exprs, partition_count) => { + let exprs = exprs + .iter() + .map(|expr| create_physical_expr(expr, &df_schema, execution_props)) + .collect::>>()?; + Ok(PhysicalPartitioning::Hash(exprs, *partition_count)) + } + LogicalPartitioning::Range(range) => { + let ordering = range + .ordering() + .iter() + .map(|expr| create_physical_sort_expr(expr, &df_schema, execution_props)) + .collect::>>()?; + let Some(ordering) = LexOrdering::new(ordering) else { + return plan_err!( + "Range output partitioning must have at least one ordering expression" + ); + }; + let split_points = + normalize_range_split_points(&ordering, range.split_points(), schema)?; + let range = PhysicalRangePartitioning::try_new(ordering, split_points)?; + Ok(PhysicalPartitioning::Range(range)) + } + LogicalPartitioning::DistributeBy(_) => { + datafusion_common::not_impl_err!( + "DistributeBy output partitioning is not supported for ListingTable" + ) + } + } +} + +fn normalize_range_split_points( + ordering: &LexOrdering, + split_points: &[SplitPoint], + schema: &SchemaRef, +) -> datafusion_common::Result> { + split_points + .iter() + .enumerate() + .map(|(split_idx, split_point)| { + let values = split_point + .values() + .iter() + .zip(ordering.iter()) + .enumerate() + .map(|(value_idx, (value, sort_expr))| { + let target_type = sort_expr.expr.data_type(schema.as_ref())?; + normalize_range_split_point_value( + value, + &target_type, + split_idx, + value_idx, + ) + }) + .collect::>>()?; + Ok(SplitPoint::new(values)) + }) + .collect() +} + +fn normalize_range_split_point_value( + value: &ScalarValue, + target_type: &DataType, + split_idx: usize, + value_idx: usize, +) -> datafusion_common::Result { + let value_type = value.data_type(); + if &value_type == target_type { + return Ok(value.clone()); + } + + if let Some(value) = try_cast_literal_to_type(value, target_type) { + return Ok(value); + } + + plan_err!( + "Range output partitioning split point {split_idx} value {value_idx} has type {value_type}, but ordering expression has type {target_type}" + ) +} + +fn filter_file_group_by_partition_filters( + file_group: FileGroup, + filters: &[Expr], + df_schema: &DFSchema, +) -> datafusion_common::Result { + let files = file_group + .into_inner() + .into_iter() + .map(|file| filter_partitioned_file(file, filters, df_schema)) + .filter_map(Result::transpose) + .collect::>>()?; + Ok(FileGroup::new(files)) +} + // Expressions can be used for partition pruning if they can be evaluated using // only the partition columns and there are partition columns. fn can_be_evaluated_for_partition_pruning( @@ -516,21 +630,18 @@ impl TableProvider for ListingTable { can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter) }); - // We should not limit the number of partitioned files to scan if there are filters and limit - // at the same time. This is because the limit should be applied after the filters are applied. - let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let declared_output_partitioning = self.options.output_partitioning.as_ref(); - let declared_output_partitioning = if partition_filters.is_empty() { - self.options.output_partitioning.clone() - } else { - // Partition pruning can remove files before grouping. Without a - // stable file-to-declared-partition mapping, regrouping the - // remaining files could shift them into the wrong partition index. - None - }; - let target_partitions = declared_output_partitioning - .as_ref() - .map(Partitioning::partition_count) + // We should not limit files before assigning declared output partitions + // or before applying non-partition filters. + let statistic_file_limit = + if filters.is_empty() && declared_output_partitioning.is_none() { + limit + } else { + None + }; + let file_group_count = declared_output_partitioning + .and_then(LogicalPartitioning::partition_count) .unwrap_or_else(|| state.config().target_partitions()); let ListFilesResult { @@ -538,13 +649,7 @@ impl TableProvider for ListingTable { statistics, grouped_by_partition: partitioned_by_file_group, } = self - .list_files_for_scan_with_target( - state, - &partition_filters, - statistic_file_limit, - target_partitions, - declared_output_partitioning.is_some(), - ) + .list_files_for_scan(state, &partition_filters, statistic_file_limit) .await?; // if no files need to be read, return an `EmptyExec` @@ -569,7 +674,7 @@ impl TableProvider for ListingTable { &self.table_schema, &partitioned_file_lists, output_ordering, - target_partitions, + file_group_count, ) }) }) @@ -577,7 +682,7 @@ impl TableProvider for ListingTable { { Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), Some(Ok(new_groups)) => { - if new_groups.len() <= target_partitions { + if new_groups.len() <= file_group_count { partitioned_file_lists = new_groups; } else { log::debug!( @@ -588,6 +693,26 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; + let output_partitioning = if let Some(output_partitioning) = + declared_output_partitioning + { + let output_partitioning = create_physical_output_partitioning( + output_partitioning, + &self.table_schema, + state.execution_props(), + )?; + let partition_count = output_partitioning.partition_count(); + if partitioned_file_lists.len() != partition_count { + return plan_err!( + "ListingTable output_partitioning has {partition_count} partitions, but the scan has {} file groups", + partitioned_file_lists.len() + ); + } + Some(output_partitioning) + } else { + None + }; + let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { @@ -597,21 +722,17 @@ impl TableProvider for ListingTable { }; let file_source = self.create_file_source(); - let mut scan_config_builder = - FileScanConfigBuilder::new(object_store_url, file_source) - .with_file_groups(partitioned_file_lists) - .with_constraints(self.constraints.clone()) - .with_statistics(statistics) - .with_projection_indices(projection)? - .with_limit(limit) - .with_output_ordering(output_ordering) - .with_output_partitioning(declared_output_partitioning) - .with_expr_adapter(self.expr_adapter_factory.clone()); - if partitioned_by_file_group { - scan_config_builder = - scan_config_builder.with_partitioned_by_file_group(true); - } - let scan_config = scan_config_builder.build(); + let scan_config = FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) + .with_statistics(statistics) + .with_projection_indices(projection)? + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_output_partitioning(output_partitioning) + .with_expr_adapter(self.expr_adapter_factory.clone()) + .with_partitioned_by_file_group(partitioned_by_file_group) + .build(); // create the execution plan let plan = self @@ -730,10 +851,8 @@ impl ListingTable { /// The list is grouped to let the execution plan know how the files should /// be distributed to different threads / executors. /// - /// If [`ListingOptions::output_partitioning`] is set, the returned file - /// groups preserve that declared partition count, including empty trailing - /// groups when needed, rather than using the scan-time target partition - /// count. + /// If [`ListingOptions::output_partitioning`] is set, returns one file + /// group per declared partition, including empty trailing groups. pub async fn list_files_for_scan<'a>( &'a self, ctx: &'a dyn Session, @@ -741,28 +860,12 @@ impl ListingTable { limit: Option, ) -> datafusion_common::Result { let declared_output_partitioning = self.options.output_partitioning.as_ref(); - let target_partitions = declared_output_partitioning - .map(Partitioning::partition_count) + let file_group_count = declared_output_partitioning + .and_then(LogicalPartitioning::partition_count) .unwrap_or_else(|| ctx.config().target_partitions()); - self.list_files_for_scan_with_target( - ctx, - filters, - limit, - target_partitions, - declared_output_partitioning.is_some(), - ) - .await - } + let has_declared_partitioning = declared_output_partitioning.is_some(); - async fn list_files_for_scan_with_target<'a>( - &'a self, - ctx: &'a dyn Session, - filters: &'a [Expr], - limit: Option, - target_partitions: usize, - preserve_partition_count: bool, - ) -> datafusion_common::Result { - if target_partitions == 0 { + if file_group_count == 0 { return plan_err!( "ListingTable requires target_partitions to be greater than zero" ); @@ -777,13 +880,23 @@ impl ListingTable { grouped_by_partition: false, }); }; + // Listing-time pruning changes the file set before `split_files` + // assigns files to output partitions. For declared output partitioning, + // list the full file set, split into declared groups, then prune within + // each group below. + let listing_time_filters = if has_declared_partitioning { + &[] + } else { + filters + }; + // list files (with partitions) let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { pruned_partition_list( ctx, store.as_ref(), table_path, - filters, + listing_time_filters, &self.options.file_extension, &self.options.table_partition_cols, ) @@ -809,8 +922,13 @@ impl ListingTable { .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); + let file_limit = if has_declared_partitioning { + None + } else { + limit + }; let (file_group, inexact_stats) = - get_files_with_limit(files, limit, ctx.config().collect_statistics()).await?; + get_files_with_limit(files, file_limit, ctx.config().collect_statistics()).await?; // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N // @@ -819,25 +937,45 @@ impl ListingTable { // hash repartitioning for aggregates and joins on partition columns. let threshold = ctx.config_options().optimizer.preserve_file_partitions; - let (mut file_groups, grouped_by_partition) = if preserve_partition_count { - (file_group.split_files(target_partitions), false) + let (mut file_groups, grouped_by_partition) = if has_declared_partitioning { + (file_group.split_files(file_group_count), false) } else if threshold > 0 && !self.options.table_partition_cols.is_empty() { - let grouped = file_group.group_by_partition_values(target_partitions); + let grouped = file_group.group_by_partition_values(file_group_count); if grouped.len() >= threshold { (grouped, true) } else { let all_files: Vec<_> = grouped.into_iter().flat_map(|g| g.into_inner()).collect(); ( - FileGroup::new(all_files).split_files(target_partitions), + FileGroup::new(all_files).split_files(file_group_count), false, ) } } else { - (file_group.split_files(target_partitions), false) + (file_group.split_files(file_group_count), false) }; - if preserve_partition_count && !file_groups.is_empty() { - file_groups.resize_with(target_partitions, || FileGroup::new(vec![])); + if has_declared_partitioning && !file_groups.is_empty() { + file_groups.resize_with(file_group_count, || FileGroup::new(vec![])); + } + + if has_declared_partitioning && !filters.is_empty() { + let df_schema = DFSchema::from_unqualified_fields( + self.options + .table_partition_cols + .iter() + .map(|(name, data_type)| Field::new(name, data_type.clone(), true)) + .collect(), + Default::default(), + )?; + + file_groups = file_groups + .into_iter() + .map(|file_group| { + filter_file_group_by_partition_filters( + file_group, filters, &df_schema, + ) + }) + .collect::>>()?; } let (file_groups, stats) = compute_all_files_statistics( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9d52e5447d64d..2842c2e97fe85 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -137,7 +137,10 @@ mod tests { use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::FileFormat; use datafusion_expr::dml::InsertOp; - use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use datafusion_expr::{ + BinaryExpr, LogicalPlanBuilder, Operator, Partitioning as LogicalPartitioning, + RangePartitioning as LogicalRangePartitioning, + }; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::expressions::{Column, binary}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -1301,11 +1304,19 @@ mod tests { let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) .with_target_partitions(1) - .with_output_partitioning(Some(Partitioning::RoundRobinBatch(4))); + .with_output_partitioning(Some(LogicalPartitioning::Range( + LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + )?, + ))); let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); @@ -1323,21 +1334,131 @@ mod tests { Ok(()) } + #[test] + fn test_listing_options_output_partitioning_overrides_target_partitions() -> Result<()> + { + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + )?); + + let output_partitioning_first = + ListingOptions::new(Arc::new(JsonFormat::default())) + .with_output_partitioning(Some(output_partitioning.clone())) + .with_target_partitions(1); + assert_eq!(output_partitioning_first.target_partitions, 4); + + let target_partitions_first = + ListingOptions::new(Arc::new(JsonFormat::default())) + .with_target_partitions(1) + .with_output_partitioning(Some(output_partitioning)); + assert_eq!(target_partitions_first.target_partitions, 4); + + Ok(()) + } + #[tokio::test] - async fn test_partition_filter_drops_declared_output_partitioning() -> Result<()> { + async fn test_range_output_partitioning_normalizes_split_point_types() -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )?); + let expected_output_partitioning = + Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int64(Some(10))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let scan = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!(scan.output_partitioning(), &expected_output_partitioning); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_rejects_invalid_split_point_type() + -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Utf8(Some( + "not-an-int".to_string(), + ))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Range output partitioning split point 0 value 0 has type Utf8, but ordering expression has type Int32" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_partition_filter_preserves_declared_output_partitioning() -> Result<()> + { let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"]; let ctx = SessionContext::new(); register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); - let output_partitioning = Partitioning::Range(RangePartitioning::try_new( - LexOrdering::new(vec![PhysicalSortExpr::new( - Arc::new(Column::new("pid", 1)), - SortOptions::default(), - )]) - .unwrap(), - vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], - )?); + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("pid").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + )?); + let expected_output_partitioning = + Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("pid", 1)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + )?); let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) @@ -1353,14 +1474,38 @@ mod tests { let table = ListingTable::try_new(config)?; let unfiltered = table.scan(&ctx.state(), None, &[], None).await?; - assert_eq!(unfiltered.output_partitioning(), &output_partitioning); + assert_eq!( + unfiltered.output_partitioning(), + &expected_output_partitioning + ); let filter = Expr::eq(col("pid"), lit(2_i32)); + let file_groups = table + .list_files_for_scan(&ctx.state(), std::slice::from_ref(&filter), None) + .await? + .file_groups + .into_iter() + .map(|group| { + group + .into_inner() + .into_iter() + .map(|file| file.path().to_string()) + .collect::>() + }) + .collect::>(); + assert_eq!( + file_groups, + vec![ + Vec::::new(), + vec!["bucket/test/pid=2/file2".to_string()] + ] + ); + let filtered = table.scan(&ctx.state(), None, &[filter], None).await?; - assert!(matches!( + assert_eq!( filtered.output_partitioning(), - Partitioning::UnknownPartitioning(1) - )); + &expected_output_partitioning + ); Ok(()) } diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 536df3ac49720..13b188f1e9dc2 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -210,8 +210,8 @@ pub struct FileScanConfig { /// /// Expressions are in terms of the full table schema, before scan /// projection or filtering. If the partition count does not match the - /// number of file groups, [`DataSource::output_partitioning`] falls back to - /// [`Partitioning::UnknownPartitioning`]. + /// number of file groups, [`DataSource::output_partitioning`] logs a warning + /// and falls back to [`Partitioning::UnknownPartitioning`]. pub output_partitioning: Option, } @@ -775,10 +775,13 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// /// When output partitioning is declared, this returns it after remapping - /// through the scan projection. Otherwise, when `partitioned_by_file_group` - /// is true, this returns `Partitioning::Hash` on the Hive partition - /// columns, allowing the optimizer to skip repartitioning for compatible - /// aggregates and joins. + /// through the scan projection. If the declared partition count does not + /// match the number of file groups, this logs a warning and returns + /// [`Partitioning::UnknownPartitioning`] to avoid advertising an invalid + /// partitioning property. Otherwise, when `partitioned_by_file_group` is + /// true, this returns `Partitioning::Hash` on the Hive partition columns, + /// allowing the optimizer to skip repartitioning for compatible aggregates + /// and joins. /// /// Tradeoffs /// - Benefit: Eliminates `RepartitionExec` for compatible queries. @@ -801,7 +804,7 @@ impl DataSource for FileScanConfig { return Partitioning::UnknownPartitioning(self.file_groups.len()); }; if output_partitioning.partition_count() != self.file_groups.len() { - debug!( + warn!( "Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.", output_partitioning.partition_count(), self.file_groups.len() diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs index 55e4f09663d90..a3e16eefd881a 100644 --- a/datafusion/sqllogictest/src/test_context/range_partitioning.rs +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -19,16 +19,13 @@ use std::fs::{create_dir_all, remove_dir_all, write}; use std::path::Path; use std::sync::Arc; -use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema}; -use datafusion::common::ScalarValue; +use datafusion::common::{ScalarValue, SplitPoint}; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::physical_expr::expressions::Column; -use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::{Partitioning, RangePartitioning, SplitPoint}; +use datafusion::logical_expr::{Partitioning, RangePartitioning, col}; use datafusion::prelude::SessionContext; // ============================================================================== @@ -43,14 +40,9 @@ pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { Field::new("non_range_key", DataType::Int32, false), Field::new("value", DataType::Int32, false), ])); - let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - Arc::new(Column::new("range_key", 0)), - SortOptions::default(), - )]) - .expect("range ordering should not be empty"); let output_partitioning = Partitioning::Range( RangePartitioning::try_new( - ordering, + vec![col("range_key").sort(true, true)], vec![ SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), From ba15301de16ce79e7e84c134961c75e9ca4653b8 Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Fri, 12 Jun 2026 10:08:13 +0200 Subject: [PATCH 5/6] Reject lossy range split point casts --- datafusion/catalog-listing/src/options.rs | 35 +++---- datafusion/catalog-listing/src/table.rs | 13 ++- .../core/src/datasource/listing/table.rs | 94 ++++++++++++------- .../datasource/src/file_scan_config/mod.rs | 49 ++++------ 4 files changed, 103 insertions(+), 88 deletions(-) diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 4f2b286a521d1..2126f4cf4091a 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -53,22 +53,19 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, - /// Optional declared output partitioning for this table. - /// - /// This source declaration supports hash and range partitioning. - /// Expressions are logical expressions against the full table schema. This - /// declaration is authoritative: when set, [`ListingTable`](crate::ListingTable) - /// creates one scan file group per declared output partition instead of the - /// scan-time target partition count. Empty file groups are added when needed - /// to preserve that count. - /// - /// For range partitioning, split point values are validated against the - /// ordering expression types when planning the scan. Value-preserving casts - /// are accepted, but incompatible or lossy split point values are rejected. - /// - /// Files are sorted by path before grouping. DataFusion does not validate - /// that rows match the declaration, so callers must ensure file group `i` - /// contains only rows for declared output partition `i`. + /// Declared output partitioning for scans from this table. + /// + /// Expressions are logical expressions over the full table schema. When set, + /// [`ListingTable`](crate::ListingTable) creates one file group per + /// declared output partition, preserving empty groups. When unset, file + /// grouping uses the scan-time + /// [`SessionConfig::target_partitions`](datafusion_execution::config::SessionConfig::target_partitions). + /// Declarations are limited to partitioning that can be represented by + /// assigning whole files to file groups. + /// + /// Files are assigned to groups in path order. DataFusion does not validate + /// row placement, and callers must ensure file group `i` contains rows for + /// partition `i`. pub output_partitioning: Option, } @@ -131,11 +128,9 @@ impl ListingOptions { self } - /// Set declared output partitioning on [`ListingOptions`] and returns self. + /// Set declared output partitioning. /// - /// See [`Self::output_partitioning`]. Empty file groups are added when - /// needed to preserve the declared partition count. Range split point values - /// are validated against the table schema when planning the scan. + /// See [`Self::output_partitioning`] for the contract. pub fn with_output_partitioning( mut self, output_partitioning: Option, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index c8bc8edaad131..8148a58bf8da0 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -540,12 +540,16 @@ fn normalize_range_split_point_value( return Ok(value.clone()); } - if let Some(value) = try_cast_literal_to_type(value, target_type) { - return Ok(value); + if let Some(casted) = try_cast_literal_to_type(value, target_type) { + // Range split points are physical metadata: normalization must not + // advertise a different boundary. + if try_cast_literal_to_type(&casted, &value_type).as_ref() == Some(value) { + return Ok(casted); + } } plan_err!( - "Range output partitioning split point {split_idx} value {value_idx} has type {value_type}, but ordering expression has type {target_type}" + "Range output partitioning split point {split_idx} value {value_idx} with type {value_type} cannot be represented exactly as ordering expression type {target_type}" ) } @@ -928,7 +932,8 @@ impl ListingTable { limit }; let (file_group, inexact_stats) = - get_files_with_limit(files, file_limit, ctx.config().collect_statistics()).await?; + get_files_with_limit(files, file_limit, ctx.config().collect_statistics()) + .await?; // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N // diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2842c2e97fe85..eb99bdbc94de3 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -123,7 +123,7 @@ mod tests { }, }; use arrow::{compute::SortOptions, record_batch::RecordBatch}; - use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; use datafusion_catalog::TableProvider; use datafusion_catalog_listing::{ ListingOptions, ListingTable, ListingTableConfig, SchemaSource, @@ -1298,12 +1298,13 @@ mod tests { async fn test_list_files_uses_declared_output_partitioning_count() -> Result<()> { let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"]; - let ctx = SessionContext::new(); + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(1), + ); register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) - .with_target_partitions(1) .with_output_partitioning(Some(LogicalPartitioning::Range( LogicalRangePartitioning::try_new( vec![col("a").sort(true, true)], @@ -1334,34 +1335,6 @@ mod tests { Ok(()) } - #[test] - fn test_listing_options_output_partitioning_overrides_target_partitions() -> Result<()> - { - let output_partitioning = - LogicalPartitioning::Range(LogicalRangePartitioning::try_new( - vec![col("a").sort(true, true)], - vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), - ], - )?); - - let output_partitioning_first = - ListingOptions::new(Arc::new(JsonFormat::default())) - .with_output_partitioning(Some(output_partitioning.clone())) - .with_target_partitions(1); - assert_eq!(output_partitioning_first.target_partitions, 4); - - let target_partitions_first = - ListingOptions::new(Arc::new(JsonFormat::default())) - .with_target_partitions(1) - .with_output_partitioning(Some(output_partitioning)); - assert_eq!(target_partitions_first.target_partitions, 4); - - Ok(()) - } - #[tokio::test] async fn test_range_output_partitioning_normalizes_split_point_types() -> Result<()> { let files = ["bucket/key-prefix/file0"]; @@ -1372,7 +1345,10 @@ mod tests { let output_partitioning = LogicalPartitioning::Range(LogicalRangePartitioning::try_new( vec![col("a").sort(true, true)], - vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + vec![SplitPoint::new(vec![ScalarValue::TimestampNanosecond( + Some(123_000_000_000), + None, + )])], )?); let expected_output_partitioning = Partitioning::Range(RangePartitioning::try_new( @@ -1381,7 +1357,10 @@ mod tests { SortOptions::default(), )]) .unwrap(), - vec![SplitPoint::new(vec![ScalarValue::Int64(Some(10))])], + vec![SplitPoint::new(vec![ScalarValue::TimestampSecond( + Some(123), + None, + )])], )?); let opt = ListingOptions::new(Arc::new(JsonFormat::default())) @@ -1389,7 +1368,11 @@ mod tests { .with_output_partitioning(Some(output_partitioning)); let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Timestamp(TimeUnit::Second, None), + false, + )])); let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); @@ -1431,7 +1414,48 @@ mod tests { let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); assert_contains!( err.to_string(), - "Range output partitioning split point 0 value 0 has type Utf8, but ordering expression has type Int32" + "Range output partitioning split point 0 value 0 with type Utf8 cannot be represented exactly as ordering expression type Int32" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_range_output_partitioning_rejects_lossy_timestamp_split_point() + -> Result<()> { + let files = ["bucket/key-prefix/file0"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = + LogicalPartitioning::Range(LogicalRangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::TimestampNanosecond( + Some(123_456), + None, + )])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_output_partitioning(Some(output_partitioning)); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Timestamp(TimeUnit::Second, None), + false, + )])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Range output partitioning split point 0 value 0 with type Timestamp(ns) cannot be represented exactly as ordering expression type Timestamp(s)" ); Ok(()) diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 13b188f1e9dc2..76604e2ad03dc 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -198,20 +198,23 @@ pub struct FileScanConfig { /// would be incorrect if there are filters being applied, thus this should be accessed /// via [`FileScanConfig::statistics`]. pub(crate) statistics: Statistics, - /// When true, file_groups are organized by partition column values - /// and output_partitioning will return Hash partitioning on partition columns. - /// This allows the optimizer to skip hash repartitioning for aggregates and joins - /// on partition columns. + /// When true, `file_groups` are organized by partition column values and + /// [`Self::output_partitioning`] derives hash partitioning on those columns. + /// This allows the optimizer to skip hash repartitioning for aggregates and + /// joins on partition columns. + /// + /// Because grouping is by whole file, this may reduce I/O parallelism when + /// partition sizes are uneven. /// /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, - /// Optional declared output partitioning of this file scan. + /// Declared physical output partitioning for this scan. /// - /// Expressions are in terms of the full table schema, before scan - /// projection or filtering. If the partition count does not match the - /// number of file groups, [`DataSource::output_partitioning`] logs a warning - /// and falls back to [`Partitioning::UnknownPartitioning`]. + /// Expressions are against the full table schema, before scan projection or + /// filtering. `ListingTable` validates partition count before building the + /// scan, and direct builders with mismatched counts fall back to + /// `UnknownPartitioning`. pub output_partitioning: Option, } @@ -472,9 +475,7 @@ impl FileScanConfigBuilder { self } - /// Set declared output partitioning for this scan. - /// - /// See [`FileScanConfig::output_partitioning`]. + /// Set declared physical output partitioning for this scan. pub fn with_output_partitioning( mut self, output_partitioning: Option, @@ -772,25 +773,15 @@ impl DataSource for FileScanConfig { Ok(source.map(|s| Arc::new(s) as _)) } - /// Returns the output partitioning for this file scan. - /// - /// When output partitioning is declared, this returns it after remapping - /// through the scan projection. If the declared partition count does not - /// match the number of file groups, this logs a warning and returns - /// [`Partitioning::UnknownPartitioning`] to avoid advertising an invalid - /// partitioning property. Otherwise, when `partitioned_by_file_group` is - /// true, this returns `Partitioning::Hash` on the Hive partition columns, - /// allowing the optimizer to skip repartitioning for compatible aggregates - /// and joins. + /// Returns declared or derived output partitioning for this file scan. /// - /// Tradeoffs - /// - Benefit: Eliminates `RepartitionExec` for compatible queries. - /// - Cost: File groups must remain intact, so byte-range file splitting - /// and sibling work stealing are disabled. + /// Declared partitioning is projected through the scan projection. If it + /// cannot be projected, or its partition count differs from `file_groups`, + /// this returns `UnknownPartitioning`. /// - /// Follow-up Work - /// - Idea: Could allow byte-range splitting within each output partition, - /// preserving I/O parallelism while maintaining partition semantics. + /// Without a declaration, `partitioned_by_file_group` derives hash + /// partitioning from Hive partition columns. Otherwise this returns + /// `UnknownPartitioning`. fn output_partitioning(&self) -> Partitioning { let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { self.partitioned_by_file_group.then(|| { From 03e860f61c3ead2f9392df1c5b2603914b7ef99a Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Mon, 22 Jun 2026 16:52:03 -0400 Subject: [PATCH 6/6] PR comments --- Cargo.lock | 1 - datafusion/catalog-listing/Cargo.toml | 1 - datafusion/catalog-listing/src/options.rs | 40 ++- datafusion/catalog-listing/src/table.rs | 334 ++++++++---------- .../core/src/datasource/listing/table.rs | 121 ++++--- datafusion/core/src/physical_planner.rs | 61 +--- .../datasource/src/file_scan_config/mod.rs | 38 +- datafusion/physical-expr/src/lib.rs | 5 +- datafusion/physical-expr/src/physical_expr.rs | 101 +++++- 9 files changed, 394 insertions(+), 308 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4f20417cdb41..4bd37c6871cd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1819,7 +1819,6 @@ dependencies = [ "datafusion-datasource-parquet", "datafusion-execution", "datafusion-expr", - "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 417bc0d0ac710..61b55397137df 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -38,7 +38,6 @@ datafusion-common = { workspace = true, features = ["object_store"] } datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } -datafusion-expr-common = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 2126f4cf4091a..8e14fce341df5 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -57,15 +57,41 @@ pub struct ListingOptions { /// /// Expressions are logical expressions over the full table schema. When set, /// [`ListingTable`](crate::ListingTable) creates one file group per - /// declared output partition, preserving empty groups. When unset, file - /// grouping uses the scan-time + /// declared output partition. When unset, file grouping uses the scan-time /// [`SessionConfig::target_partitions`](datafusion_execution::config::SessionConfig::target_partitions). - /// Declarations are limited to partitioning that can be represented by - /// assigning whole files to file groups. /// - /// Files are assigned to groups in path order. DataFusion does not validate - /// row placement, and callers must ensure file group `i` contains rows for - /// partition `i`. + /// Files are listed in path order, split into whole-file groups across the + /// declared partition count, and then padded with trailing empty groups when + /// needed. DataFusion does not route files by partition values or validate + /// row placement, so callers must ensure file group `i` contains rows for + /// partition `i`. Layouts that require explicit file-to-partition assignment + /// are not supported. + /// + /// For example, range partitioning on column `a` with split points + /// `[10, 20, 30]` declares four output partitions. With three path-ordered + /// files, the trailing partition is preserved as empty: + /// + /// ```text + /// files in path order: f0, f1, f2 + /// + /// file groups: + /// partition 0: [f0] + /// partition 1: [f1] + /// partition 2: [f2] + /// partition 3: [] + /// ``` + /// + /// With five path-ordered files, a partition can contain multiple files: + /// + /// ```text + /// files in path order: f0, f1, f2, f3, f4 + /// + /// file groups: + /// partition 0: [f0, f1] + /// partition 1: [f2, f3] + /// partition 2: [f4] + /// partition 3: [] + /// ``` pub output_partitioning: Option, } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 8148a58bf8da0..36d85b981c06c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -20,13 +20,13 @@ use crate::helpers::{ expr_applicable_for_cols, filter_partitioned_file, pruned_partition_list, }; use crate::{ListingOptions, ListingTableConfig}; -use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; +use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef}; use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, DFSchema, ScalarValue, SchemaExt, SplitPoint, Statistics, - internal_datafusion_err, plan_err, project_schema, + Constraints, DFSchema, SchemaExt, Statistics, internal_datafusion_err, plan_err, + project_schema, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; @@ -43,11 +43,7 @@ use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{ Expr, Partitioning as LogicalPartitioning, TableProviderFilterPushDown, TableType, }; -use datafusion_expr_common::casts::try_cast_literal_to_type; -use datafusion_physical_expr::{ - Partitioning as PhysicalPartitioning, RangePartitioning as PhysicalRangePartitioning, - create_lex_ordering, create_physical_expr, create_physical_sort_expr, -}; +use datafusion_physical_expr::{create_lex_ordering, create_physical_partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; @@ -457,102 +453,6 @@ fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option datafusion_common::Result { - let df_schema = DFSchema::try_from(Arc::clone(schema))?; - match partitioning { - LogicalPartitioning::RoundRobinBatch(_) => { - datafusion_common::not_impl_err!( - "RoundRobinBatch output partitioning is not supported for ListingTable" - ) - } - LogicalPartitioning::Hash(exprs, partition_count) => { - let exprs = exprs - .iter() - .map(|expr| create_physical_expr(expr, &df_schema, execution_props)) - .collect::>>()?; - Ok(PhysicalPartitioning::Hash(exprs, *partition_count)) - } - LogicalPartitioning::Range(range) => { - let ordering = range - .ordering() - .iter() - .map(|expr| create_physical_sort_expr(expr, &df_schema, execution_props)) - .collect::>>()?; - let Some(ordering) = LexOrdering::new(ordering) else { - return plan_err!( - "Range output partitioning must have at least one ordering expression" - ); - }; - let split_points = - normalize_range_split_points(&ordering, range.split_points(), schema)?; - let range = PhysicalRangePartitioning::try_new(ordering, split_points)?; - Ok(PhysicalPartitioning::Range(range)) - } - LogicalPartitioning::DistributeBy(_) => { - datafusion_common::not_impl_err!( - "DistributeBy output partitioning is not supported for ListingTable" - ) - } - } -} - -fn normalize_range_split_points( - ordering: &LexOrdering, - split_points: &[SplitPoint], - schema: &SchemaRef, -) -> datafusion_common::Result> { - split_points - .iter() - .enumerate() - .map(|(split_idx, split_point)| { - let values = split_point - .values() - .iter() - .zip(ordering.iter()) - .enumerate() - .map(|(value_idx, (value, sort_expr))| { - let target_type = sort_expr.expr.data_type(schema.as_ref())?; - normalize_range_split_point_value( - value, - &target_type, - split_idx, - value_idx, - ) - }) - .collect::>>()?; - Ok(SplitPoint::new(values)) - }) - .collect() -} - -fn normalize_range_split_point_value( - value: &ScalarValue, - target_type: &DataType, - split_idx: usize, - value_idx: usize, -) -> datafusion_common::Result { - let value_type = value.data_type(); - if &value_type == target_type { - return Ok(value.clone()); - } - - if let Some(casted) = try_cast_literal_to_type(value, target_type) { - // Range split points are physical metadata: normalization must not - // advertise a different boundary. - if try_cast_literal_to_type(&casted, &value_type).as_ref() == Some(value) { - return Ok(casted); - } - } - - plan_err!( - "Range output partitioning split point {split_idx} value {value_idx} with type {value_type} cannot be represented exactly as ordering expression type {target_type}" - ) -} - fn filter_file_group_by_partition_filters( file_group: FileGroup, filters: &[Expr], @@ -700,11 +600,26 @@ impl TableProvider for ListingTable { let output_partitioning = if let Some(output_partitioning) = declared_output_partitioning { - let output_partitioning = create_physical_output_partitioning( - output_partitioning, - &self.table_schema, - state.execution_props(), - )?; + let output_partitioning = match output_partitioning { + LogicalPartitioning::RoundRobinBatch(_) => { + return datafusion_common::not_impl_err!( + "RoundRobinBatch output partitioning is not supported for ListingTable" + ); + } + LogicalPartitioning::DistributeBy(_) => { + return datafusion_common::not_impl_err!( + "DistributeBy output partitioning is not supported for ListingTable" + ); + } + LogicalPartitioning::Hash(_, _) | LogicalPartitioning::Range(_) => { + let df_schema = DFSchema::try_from(Arc::clone(&self.table_schema))?; + create_physical_partitioning( + output_partitioning, + &df_schema, + state.execution_props(), + )? + } + }; let partition_count = output_partitioning.partition_count(); if partitioned_file_lists.len() != partition_count { return plan_err!( @@ -863,37 +778,25 @@ impl ListingTable { filters: &'a [Expr], limit: Option, ) -> datafusion_common::Result { - let declared_output_partitioning = self.options.output_partitioning.as_ref(); - let file_group_count = declared_output_partitioning - .and_then(LogicalPartitioning::partition_count) - .unwrap_or_else(|| ctx.config().target_partitions()); - let has_declared_partitioning = declared_output_partitioning.is_some(); - - if file_group_count == 0 { - return plan_err!( - "ListingTable requires target_partitions to be greater than zero" - ); - } - - let store = if let Some(url) = self.table_paths.first() { - ctx.runtime_env().object_store(url)? - } else { - return Ok(ListFilesResult { - file_groups: vec![], - statistics: Statistics::new_unknown(&self.file_schema), - grouped_by_partition: false, - }); - }; - // Listing-time pruning changes the file set before `split_files` - // assigns files to output partitions. For declared output partitioning, - // list the full file set, split into declared groups, then prune within - // each group below. - let listing_time_filters = if has_declared_partitioning { - &[] + if let Some(output_partitioning) = self.options.output_partitioning.as_ref() { + self.list_files_for_declared_output_partitioning( + ctx, + output_partitioning, + filters, + ) + .await } else { - filters - }; + self.list_files_for_regular_scan(ctx, filters, limit).await + } + } + async fn collect_files_for_scan<'a>( + &'a self, + ctx: &'a dyn Session, + store: &'a Arc, + listing_time_filters: &'a [Expr], + file_limit: Option, + ) -> datafusion_common::Result<(FileGroup, bool)> { // list files (with partitions) let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { pruned_partition_list( @@ -914,7 +817,7 @@ impl ListingTable { .map(|part_file| async { let part_file = part_file?; let (statistics, ordering) = if ctx.config().collect_statistics() { - self.do_collect_statistics_and_ordering(ctx, &store, &part_file) + self.do_collect_statistics_and_ordering(ctx, store, &part_file) .await? } else { (Arc::new(Statistics::new_unknown(&self.file_schema)), None) @@ -926,14 +829,34 @@ impl ListingTable { .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); - let file_limit = if has_declared_partitioning { - None + get_files_with_limit(files, file_limit, ctx.config().collect_statistics()).await + } + + async fn list_files_for_regular_scan<'a>( + &'a self, + ctx: &'a dyn Session, + filters: &'a [Expr], + limit: Option, + ) -> datafusion_common::Result { + let file_group_count = ctx.config().target_partitions(); + if file_group_count == 0 { + return plan_err!( + "ListingTable requires target_partitions to be greater than zero" + ); + } + + let store = if let Some(url) = self.table_paths.first() { + ctx.runtime_env().object_store(url)? } else { - limit + return Ok(ListFilesResult { + file_groups: vec![], + statistics: Statistics::new_unknown(&self.file_schema), + grouped_by_partition: false, + }); }; - let (file_group, inexact_stats) = - get_files_with_limit(files, file_limit, ctx.config().collect_statistics()) - .await?; + let (file_group, inexact_stats) = self + .collect_files_for_scan(ctx, &store, filters, limit) + .await?; // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N // @@ -942,47 +865,102 @@ impl ListingTable { // hash repartitioning for aggregates and joins on partition columns. let threshold = ctx.config_options().optimizer.preserve_file_partitions; - let (mut file_groups, grouped_by_partition) = if has_declared_partitioning { - (file_group.split_files(file_group_count), false) - } else if threshold > 0 && !self.options.table_partition_cols.is_empty() { - let grouped = file_group.group_by_partition_values(file_group_count); - if grouped.len() >= threshold { - (grouped, true) + let (file_groups, grouped_by_partition) = + if threshold > 0 && !self.options.table_partition_cols.is_empty() { + let grouped = file_group.group_by_partition_values(file_group_count); + if grouped.len() >= threshold { + (grouped, true) + } else { + let all_files: Vec<_> = + grouped.into_iter().flat_map(|g| g.into_inner()).collect(); + ( + FileGroup::new(all_files).split_files(file_group_count), + false, + ) + } } else { - let all_files: Vec<_> = - grouped.into_iter().flat_map(|g| g.into_inner()).collect(); - ( - FileGroup::new(all_files).split_files(file_group_count), - false, - ) - } + (file_group.split_files(file_group_count), false) + }; + + self.list_files_result_from_groups( + ctx, + file_groups, + inexact_stats, + grouped_by_partition, + ) + } + + async fn list_files_for_declared_output_partitioning<'a>( + &'a self, + ctx: &'a dyn Session, + output_partitioning: &LogicalPartitioning, + filters: &'a [Expr], + ) -> datafusion_common::Result { + let Some(file_group_count) = output_partitioning.partition_count() else { + return datafusion_common::not_impl_err!( + "DistributeBy output partitioning is not supported for ListingTable" + ); + }; + if file_group_count == 0 { + return plan_err!( + "ListingTable output_partitioning requires at least one partition" + ); + } + + let store = if let Some(url) = self.table_paths.first() { + ctx.runtime_env().object_store(url)? } else { - (file_group.split_files(file_group_count), false) + return Ok(ListFilesResult { + file_groups: vec![], + statistics: Statistics::new_unknown(&self.file_schema), + grouped_by_partition: false, + }); }; - if has_declared_partitioning && !file_groups.is_empty() { + let (file_group, inexact_stats) = + self.collect_files_for_scan(ctx, &store, &[], None).await?; + let mut file_groups = file_group.split_files(file_group_count); + if !file_groups.is_empty() { file_groups.resize_with(file_group_count, || FileGroup::new(vec![])); } + let file_groups = + self.filter_declared_file_groups_by_partition_filters(file_groups, filters)?; - if has_declared_partitioning && !filters.is_empty() { - let df_schema = DFSchema::from_unqualified_fields( - self.options - .table_partition_cols - .iter() - .map(|(name, data_type)| Field::new(name, data_type.clone(), true)) - .collect(), - Default::default(), - )?; - - file_groups = file_groups - .into_iter() - .map(|file_group| { - filter_file_group_by_partition_filters( - file_group, filters, &df_schema, - ) - }) - .collect::>>()?; + self.list_files_result_from_groups(ctx, file_groups, inexact_stats, false) + } + + fn filter_declared_file_groups_by_partition_filters( + &self, + file_groups: Vec, + filters: &[Expr], + ) -> datafusion_common::Result> { + if filters.is_empty() { + return Ok(file_groups); } + let df_schema = DFSchema::from_unqualified_fields( + self.options + .table_partition_cols + .iter() + .map(|(name, data_type)| Field::new(name, data_type.clone(), true)) + .collect(), + Default::default(), + )?; + + file_groups + .into_iter() + .map(|file_group| { + filter_file_group_by_partition_filters(file_group, filters, &df_schema) + }) + .collect::>>() + } + + fn list_files_result_from_groups( + &self, + ctx: &dyn Session, + file_groups: Vec, + inexact_stats: bool, + grouped_by_partition: bool, + ) -> datafusion_common::Result { let (file_groups, stats) = compute_all_files_statistics( file_groups, self.schema(), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index eb99bdbc94de3..39c20f9b786c2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -181,6 +181,21 @@ mod tests { .collect() } + fn listing_table_with_files( + ctx: &SessionContext, + files: &[&str], + table_path: &str, + options: ListingOptions, + schema: Schema, + ) -> Result { + register_test_store(ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let config = ListingTableConfig::new(ListingTableUrl::parse(table_path)?) + .with_listing_options(options) + .with_schema(Arc::new(schema)); + ListingTable::try_new(config) + } + #[tokio::test] async fn test_schema_source_tracking_comprehensive() -> Result<()> { let ctx = SessionContext::new(); @@ -1301,27 +1316,25 @@ mod tests { let ctx = SessionContext::new_with_config( SessionConfig::new().with_target_partitions(1), ); - register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); - let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) .with_output_partitioning(Some(LogicalPartitioning::Range( LogicalRangePartitioning::try_new( vec![col("a").sort(true, true)], vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + SplitPoint::new(vec![ScalarValue::from(10i32)]), + SplitPoint::new(vec![ScalarValue::from(20i32)]), + SplitPoint::new(vec![ScalarValue::from(30i32)]), ], )?, ))); - - let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new("a", DataType::Int32, false)]), + )?; let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; let group_sizes = result @@ -1340,8 +1353,6 @@ mod tests { let files = ["bucket/key-prefix/file0"]; let ctx = SessionContext::new(); - register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); - let output_partitioning = LogicalPartitioning::Range(LogicalRangePartitioning::try_new( vec![col("a").sort(true, true)], @@ -1366,17 +1377,17 @@ mod tests { let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) .with_output_partitioning(Some(output_partitioning)); - - let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; - let schema = Arc::new(Schema::new(vec![Field::new( - "a", - DataType::Timestamp(TimeUnit::Second, None), - false, - )])); - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new( + "a", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]), + )?; let scan = table.scan(&ctx.state(), None, &[], None).await?; assert_eq!(scan.output_partitioning(), &expected_output_partitioning); @@ -1390,8 +1401,6 @@ mod tests { let files = ["bucket/key-prefix/file0"]; let ctx = SessionContext::new(); - register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); - let output_partitioning = LogicalPartitioning::Range(LogicalRangePartitioning::try_new( vec![col("a").sort(true, true)], @@ -1403,13 +1412,13 @@ mod tests { let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) .with_output_partitioning(Some(output_partitioning)); - - let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new("a", DataType::Int32, false)]), + )?; let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); assert_contains!( @@ -1426,8 +1435,6 @@ mod tests { let files = ["bucket/key-prefix/file0"]; let ctx = SessionContext::new(); - register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); - let output_partitioning = LogicalPartitioning::Range(LogicalRangePartitioning::try_new( vec![col("a").sort(true, true)], @@ -1440,17 +1447,17 @@ mod tests { let opt = ListingOptions::new(Arc::new(JsonFormat::default())) .with_file_extension_opt(Some("")) .with_output_partitioning(Some(output_partitioning)); - - let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; - let schema = Arc::new(Schema::new(vec![Field::new( - "a", - DataType::Timestamp(TimeUnit::Second, None), - false, - )])); - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/key-prefix/", + opt, + Schema::new(vec![Field::new( + "a", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]), + )?; let err = table.scan(&ctx.state(), None, &[], None).await.unwrap_err(); assert_contains!( @@ -1467,12 +1474,10 @@ mod tests { let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"]; let ctx = SessionContext::new(); - register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); - let output_partitioning = LogicalPartitioning::Range(LogicalRangePartitioning::try_new( vec![col("pid").sort(true, true)], - vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + vec![SplitPoint::new(vec![ScalarValue::from(2i32)])], )?); let expected_output_partitioning = Partitioning::Range(RangePartitioning::try_new( @@ -1481,7 +1486,7 @@ mod tests { SortOptions::default(), )]) .unwrap(), - vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + vec![SplitPoint::new(vec![ScalarValue::from(2i32)])], )?); let opt = ListingOptions::new(Arc::new(JsonFormat::default())) @@ -1489,13 +1494,13 @@ mod tests { .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]) .with_output_partitioning(Some(output_partitioning.clone())); - let table_path = ListingTableUrl::parse("test:///bucket/test/")?; - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; + let table = listing_table_with_files( + &ctx, + &files, + "test:///bucket/test/", + opt, + Schema::new(vec![Field::new("a", DataType::Boolean, false)]), + )?; let unfiltered = table.scan(&ctx.state(), None, &[], None).await?; assert_eq!( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 190a08da12222..9cd2fd1131a87 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -32,10 +32,11 @@ use crate::logical_expr::{ Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window, }; use crate::logical_expr::{ - Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, - UserDefinedLogicalNode, + Expr, LogicalPlan, PlanType, Repartition, UserDefinedLogicalNode, +}; +use crate::physical_expr::{ + create_physical_expr, create_physical_exprs, create_physical_partitioning, }; -use crate::physical_expr::{create_physical_expr, create_physical_exprs}; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; @@ -52,8 +53,8 @@ use crate::physical_plan::union::UnionExec; use crate::physical_plan::unnest::UnnestExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ - ExecutionPlan, ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, - WindowExpr, displayable, windows, + ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, WindowExpr, + displayable, windows, }; use crate::schema_equivalence::schema_satisfied_by; @@ -98,7 +99,7 @@ use datafusion_physical_expr::aggregate::{ }; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::{ - LexOrdering, PhysicalSortExpr, RangePartitioning, create_physical_sort_exprs, + LexOrdering, PhysicalSortExpr, create_physical_sort_exprs, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; @@ -1251,41 +1252,11 @@ impl DefaultPhysicalPlanner { }) => { let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); - let physical_partitioning = match partitioning_scheme { - LogicalPartitioning::RoundRobinBatch(n) => { - Partitioning::RoundRobinBatch(*n) - } - LogicalPartitioning::Hash(expr, n) => { - let runtime_expr = expr - .iter() - .map(|e| { - create_physical_expr(e, input_dfschema, execution_props) - }) - .collect::>>()?; - Partitioning::Hash(runtime_expr, *n) - } - LogicalPartitioning::Range(range) => { - let sort_exprs = create_physical_sort_exprs( - range.ordering(), - input_dfschema, - execution_props, - )?; - let ordering = LexOrdering::new(sort_exprs).ok_or_else(|| { - internal_datafusion_err!( - "Range repartitioning requires non-empty ordering" - ) - })?; - Partitioning::Range(RangePartitioning::try_new( - ordering, - range.split_points().to_vec(), - )?) - } - LogicalPartitioning::DistributeBy(_) => { - return not_impl_err!( - "Physical plan does not support DistributeBy partitioning" - ); - } - }; + let physical_partitioning = create_physical_partitioning( + partitioning_scheme, + input_dfschema, + execution_props, + )?; Arc::new(RepartitionExec::try_new( physical_input, physical_partitioning, @@ -3249,8 +3220,8 @@ mod tests { use crate::datasource::MemTable; use crate::datasource::file_format::options::CsvReadOptions; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, PlanProperties, SendableRecordBatchStream, - expressions, + DisplayAs, DisplayFormatType, Partitioning, PlanProperties, + SendableRecordBatchStream, expressions, }; use crate::prelude::{SessionConfig, SessionContext}; use crate::test_util::{scan_empty, scan_empty_with_partitions}; @@ -3271,8 +3242,8 @@ mod tests { use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::{ Accumulator, AggregateUDF, AggregateUDFImpl, ExprFunctionExt, LogicalPlanBuilder, - RangePartitioning, Signature, TableSource, UserDefinedLogicalNodeCore, - Volatility, WindowFunctionDefinition, col, lit, + Partitioning as LogicalPartitioning, RangePartitioning, Signature, TableSource, + UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition, col, lit, }; use datafusion_functions_aggregate::count::{count_all, count_udaf}; use datafusion_functions_aggregate::expr_fn::sum; diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 76604e2ad03dc..21d733458cbc9 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -198,16 +198,16 @@ pub struct FileScanConfig { /// would be incorrect if there are filters being applied, thus this should be accessed /// via [`FileScanConfig::statistics`]. pub(crate) statistics: Statistics, - /// When true, `file_groups` are organized by partition column values and - /// [`Self::output_partitioning`] derives hash partitioning on those columns. - /// This allows the optimizer to skip hash repartitioning for aggregates and - /// joins on partition columns. - /// - /// Because grouping is by whole file, this may reduce I/O parallelism when - /// partition sizes are uneven. + /// When true, file_groups are organized by partition column values + /// and output_partitioning will return Hash partitioning on partition columns. + /// This allows the optimizer to skip hash repartitioning for aggregates and joins + /// on partition columns. /// /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. + /// + /// Follow-up: remove this redundant field in favor of + /// `output_partitioning`, see . pub partitioned_by_file_group: bool, /// Declared physical output partitioning for this scan. /// @@ -773,15 +773,27 @@ impl DataSource for FileScanConfig { Ok(source.map(|s| Arc::new(s) as _)) } - /// Returns declared or derived output partitioning for this file scan. + /// Returns the output partitioning for this file scan. /// - /// Declared partitioning is projected through the scan projection. If it - /// cannot be projected, or its partition count differs from `file_groups`, - /// this returns `UnknownPartitioning`. + /// When `output_partitioning` is set, this returns the declared partitioning + /// after applying scan projection. When `partitioned_by_file_group` is true, + /// this returns `Partitioning::Hash` on the Hive partition columns, allowing + /// the optimizer to skip hash repartitioning for aggregates and joins on + /// those columns. /// - /// Without a declaration, `partitioned_by_file_group` derives hash - /// partitioning from Hive partition columns. Otherwise this returns + /// If projection or partition count validation fails, this returns /// `UnknownPartitioning`. + /// + /// Tradeoffs + /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries whose + /// required distribution is satisfied by the scan's output partitioning. + /// - Cost: Files are grouped by partition values rather than split by byte + /// ranges, which may reduce I/O parallelism when partition sizes are uneven. + /// For simple aggregations without `ORDER BY`, this cost may outweigh the benefit. + /// + /// Follow-up Work + /// - Idea: Could allow byte-range splitting within partition-aware groups, + /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { self.partitioned_by_file_group.then(|| { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b55bd70bdf185..67419944cfde6 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -66,8 +66,9 @@ pub use expressions::{DynamicFilterTracker, DynamicFilterTracking}; pub use partitioning::{Distribution, Partitioning, RangePartitioning}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, - create_ordering, create_physical_sort_expr, create_physical_sort_exprs, - physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, + create_ordering, create_physical_partitioning, create_physical_sort_expr, + create_physical_sort_exprs, physical_exprs_bag_equal, physical_exprs_contains, + physical_exprs_equal, }; pub use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, PhysicalExprRef}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 77ede76e1daa8..6ff5be4e38229 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -21,15 +21,17 @@ use crate::expressions::{self, Column}; use crate::{LexOrdering, PhysicalSortExpr, create_physical_expr}; use arrow::compute::SortOptions; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{DFSchema, HashMap}; +use datafusion_common::{DFSchema, HashMap, ScalarValue, SplitPoint}; use datafusion_common::{Result, plan_err}; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{Expr, SortExpr}; +use datafusion_expr::{Expr, Partitioning as LogicalPartitioning, SortExpr}; +use datafusion_expr_common::casts::try_cast_literal_to_type; use itertools::izip; // Exports: +use crate::{Partitioning, RangePartitioning}; pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -216,6 +218,99 @@ pub fn create_physical_sort_exprs( .collect() } +/// Create physical partitioning from logical partitioning. +pub fn create_physical_partitioning( + partitioning: &LogicalPartitioning, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result { + match partitioning { + LogicalPartitioning::RoundRobinBatch(n) => Ok(Partitioning::RoundRobinBatch(*n)), + LogicalPartitioning::Hash(exprs, partition_count) => { + let exprs = exprs + .iter() + .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .collect::>>()?; + Ok(Partitioning::Hash(exprs, *partition_count)) + } + LogicalPartitioning::Range(range) => { + let ordering = create_physical_sort_exprs( + range.ordering(), + input_dfschema, + execution_props, + )?; + let Some(ordering) = LexOrdering::new(ordering) else { + return plan_err!("Range partitioning requires non-empty ordering"); + }; + let split_points = normalize_range_split_points( + &ordering, + range.split_points(), + input_dfschema.as_arrow(), + )?; + let range = RangePartitioning::try_new(ordering, split_points)?; + Ok(Partitioning::Range(range)) + } + LogicalPartitioning::DistributeBy(_) => { + datafusion_common::not_impl_err!( + "Physical plan does not support DistributeBy partitioning" + ) + } + } +} + +fn normalize_range_split_points( + ordering: &LexOrdering, + split_points: &[SplitPoint], + schema: &Schema, +) -> Result> { + split_points + .iter() + .enumerate() + .map(|(split_idx, split_point)| { + let values = split_point + .values() + .iter() + .zip(ordering.iter()) + .enumerate() + .map(|(value_idx, (value, sort_expr))| { + let target_type = sort_expr.expr.data_type(schema)?; + normalize_range_split_point_value( + value, + &target_type, + split_idx, + value_idx, + ) + }) + .collect::>>()?; + Ok(SplitPoint::new(values)) + }) + .collect() +} + +fn normalize_range_split_point_value( + value: &ScalarValue, + target_type: &DataType, + split_idx: usize, + value_idx: usize, +) -> Result { + let value_type = value.data_type(); + if &value_type == target_type { + return Ok(value.clone()); + } + + if let Some(casted) = try_cast_literal_to_type(value, target_type) { + // Split points define physical partition boundaries, so normalization + // must reject casts that would change the advertised boundary. + if try_cast_literal_to_type(&casted, &value_type).as_ref() == Some(value) { + return Ok(casted); + } + } + + plan_err!( + "Range output partitioning split point {split_idx} value {value_idx} with type {value_type} cannot be represented exactly as ordering expression type {target_type}" + ) +} + pub fn add_offset_to_physical_sort_exprs( sort_exprs: impl IntoIterator, offset: isize,