From f8413e37e299d1f13f995fa4858495c08128972f Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 13:18:49 +0800 Subject: [PATCH 01/10] fix: Reverse row selection should respect the row group index --- datafusion/datasource-parquet/src/opener.rs | 10 +- datafusion/datasource-parquet/src/sort.rs | 136 +++++++++++++++++--- 2 files changed, 128 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bea970f144863..c53e92e530404 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -148,13 +148,19 @@ impl PreparedAccessPlan { mut self, file_metadata: &parquet::file::metadata::ParquetMetaData, ) -> Result { + // Get the row group indexes before reversing + let row_groups_to_scan = self.row_group_indexes.clone(); + // Reverse the row group indexes self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect(); // If we have a row selection, reverse it to match the new row group order if let Some(row_selection) = self.row_selection { - self.row_selection = - Some(reverse_row_selection(&row_selection, file_metadata)?); + self.row_selection = Some(reverse_row_selection( + &row_selection, + file_metadata, + &row_groups_to_scan, // Pass the original (non-reversed) row group indexes + )?); } Ok(self) diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index 4255d4d6960b1..5af8aef64cb94 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -22,6 +22,8 @@ use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::ParquetMetaData; use std::collections::HashMap; +// datafusion/datasource-parquet/src/sort.rs + /// Reverse a row selection to match reversed row group order. /// /// When scanning row groups in reverse order, we need to adjust the row selection @@ -31,22 +33,34 @@ use std::collections::HashMap; /// 3. Reconstructs the row selection for the new order /// /// # Arguments -/// * `row_selection` - Original row selection +/// * `row_selection` - Original row selection (only covers row groups that are scanned) /// * `parquet_metadata` - Metadata containing row group information +/// * `row_groups_to_scan` - Indexes of row groups that will be scanned (in original order) /// /// # Returns /// A new `RowSelection` adjusted for reversed row group order +/// +/// # Important Notes +/// The input `row_selection` only covers the row groups specified in `row_groups_to_scan`. +/// Row groups that are skipped (not in `row_groups_to_scan`) are not represented in the +/// `row_selection` at all. This function needs `row_groups_to_scan` to correctly map +/// the selection back to the original row groups. pub fn reverse_row_selection( row_selection: &RowSelection, parquet_metadata: &ParquetMetaData, + row_groups_to_scan: &[usize], ) -> Result { let rg_metadata = parquet_metadata.row_groups(); - // Build a mapping of row group index to its row range in the file + // Build a mapping of row group index to its row range, but ONLY for + // the row groups that are actually being scanned. + // The row numbers in this mapping are relative to the scanned row groups, + // not the entire file. let mut rg_row_ranges: Vec<(usize, usize, usize)> = - Vec::with_capacity(rg_metadata.len()); + Vec::with_capacity(row_groups_to_scan.len()); let mut current_row = 0; - for (rg_idx, rg) in rg_metadata.iter().enumerate() { + for &rg_idx in row_groups_to_scan { + let rg = &rg_metadata[rg_idx]; let num_rows = rg.num_rows() as usize; rg_row_ranges.push((rg_idx, current_row, current_row + num_rows)); current_row += num_rows; @@ -82,12 +96,13 @@ pub fn reverse_row_selection( } // Build new selection for reversed row group order + // Only iterate over the row groups that are being scanned, in reverse order let mut reversed_selectors = Vec::new(); - for rg_idx in (0..rg_metadata.len()).rev() { + for &rg_idx in row_groups_to_scan.iter().rev() { if let Some(selectors) = rg_selections.get(&rg_idx) { reversed_selectors.extend(selectors.iter().cloned()); } else { - // No specific selection for this row group means select all + // No specific selection for this row group means select all rows in it if let Some((_, start, end)) = rg_row_ranges.iter().find(|(idx, _, _)| *idx == rg_idx) { @@ -153,7 +168,10 @@ mod tests { let selection = RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(250)]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); // Verify total selected rows remain the same let original_selected: usize = selection @@ -181,7 +199,9 @@ mod tests { RowSelector::skip(150), ]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); // Verify total selected rows remain the same let original_selected: usize = selection @@ -205,7 +225,10 @@ mod tests { // Select all rows let selection = RowSelection::from(vec![RowSelector::select(300)]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); // Should still select all rows, just in reversed row group order let total_selected: usize = reversed @@ -224,7 +247,10 @@ mod tests { // Skip all rows let selection = RowSelection::from(vec![RowSelector::skip(300)]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); // Should still skip all rows let total_selected: usize = reversed @@ -246,7 +272,10 @@ mod tests { RowSelector::skip(75), ]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); let original_selected: usize = selection .iter() @@ -269,7 +298,10 @@ mod tests { let selection = RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); // With single row group, selection should remain the same let original_selected: usize = selection @@ -299,7 +331,10 @@ mod tests { RowSelector::select(100), ]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); let original_selected: usize = selection .iter() @@ -328,7 +363,10 @@ mod tests { RowSelector::select(50), // Last 50 of RG2 ]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); // Verify total selected rows remain the same let original_selected: usize = selection @@ -358,7 +396,10 @@ mod tests { RowSelector::skip(100), // Skip RG2 ]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); let original_selected: usize = selection .iter() @@ -388,7 +429,10 @@ mod tests { RowSelector::skip(100), // RG3 ]); - let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + let row_groups_to_scan = vec![0, 1, 2]; + + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); let original_selected: usize = selection .iter() @@ -404,4 +448,64 @@ mod tests { assert_eq!(original_selected, reversed_selected); assert_eq!(original_selected, 200); } + + #[test] + fn test_reverse_with_skipped_row_groups() { + // This is the key test case for the bug fix + let metadata = create_test_metadata(vec![100, 100, 100, 100]); + + // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial), RG3 (scan all) + // The row selection only covers RG0, RG2, RG3 (300 rows total) + let selection = RowSelection::from(vec![ + RowSelector::select(100), // RG0: all 100 rows + RowSelector::select(25), // RG2: select first 25 rows + RowSelector::skip(75), // RG2: skip last 75 rows + RowSelector::select(100), // RG3: all 100 rows + ]); + + // Only scanning RG0, RG2, RG3 (RG1 is not in the scan plan) + let row_groups_to_scan = vec![0, 2, 3]; + let reversed = + reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + + // Verify total selected rows remain the same + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, 225); // 100 + 25 + 100 + assert_eq!(reversed_selected, 225); + + // Verify the reversed selection structure + // After reversal, the order becomes: RG3, RG2, RG0 + // - RG3: select(100) + // - RG2: select(25), skip(75) (note: internal order preserved, not reversed) + // - RG0: select(100) + // + // After RowSelection::from() merges adjacent selectors of the same type: + // - RG3's select(100) + RG2's select(25) = select(125) + // - RG2's skip(75) remains as skip(75) + // - RG0's select(100) remains as select(100) + let selectors: Vec<_> = reversed.iter().collect(); + assert_eq!(selectors.len(), 3); + + // RG3 (100) + RG2 first part (25) merged into select(125) + assert!(!selectors[0].skip); + assert_eq!(selectors[0].row_count, 125); + + // RG2: skip last 75 rows + assert!(selectors[1].skip); + assert_eq!(selectors[1].row_count, 75); + + // RG0: select all 100 rows + assert!(!selectors[2].skip); + assert_eq!(selectors[2].row_count, 100); + } } From 0ad280ff73b12877c91450cb23620dac24cee456 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 13:24:43 +0800 Subject: [PATCH 02/10] fix --- datafusion/datasource-parquet/src/sort.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index 5af8aef64cb94..96c6256d793f1 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -22,8 +22,6 @@ use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::ParquetMetaData; use std::collections::HashMap; -// datafusion/datasource-parquet/src/sort.rs - /// Reverse a row selection to match reversed row group order. /// /// When scanning row groups in reverse order, we need to adjust the row selection From 717b65226018581772ceba34789771b4bca1ab56 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 13:48:18 +0800 Subject: [PATCH 03/10] add slt test --- .../sqllogictest/test_files/sort_pushdown.slt | 185 ++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 58d9915a24be2..4f85cfadaa1da 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -884,3 +884,188 @@ DROP TABLE signed_parquet; statement ok SET datafusion.optimizer.enable_sort_pushdown = true; + +# Test 1.8: Reverse scan with non-contiguous row groups and row selection +# This is the key bug fix scenario: when some row groups are skipped (not in scan plan), +# the row selection must still be correctly reversed + +# Create data with 4 row groups, but we'll only scan 3 of them (skipping RG1) +statement ok +CREATE TABLE non_contiguous_rg_data(id INT, category VARCHAR, value INT) AS VALUES +(1, 'alpha', 10), +(2, 'alpha', 20), +(3, 'beta', 30), +(4, 'beta', 40), +(5, 'gamma', 50), +(6, 'gamma', 60), +(7, 'delta', 70), +(8, 'delta', 80); + +# Write with small row groups (2 rows each = 4 row groups: RG0, RG1, RG2, RG3) +statement ok +SET datafusion.execution.parquet.max_row_group_size = 2; + +query I +COPY (SELECT * FROM non_contiguous_rg_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/non_contiguous_rg.parquet'; +---- +8 + +# Reset row group size +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE non_contiguous_rg_parquet(id INT, category VARCHAR, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/non_contiguous_rg.parquet' +WITH ORDER (id ASC); + +# Enable page index and pushdown filters to create row selection +statement ok +SET datafusion.execution.parquet.enable_page_index = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Test case: Filter that skips RG1 entirely (beta category) +# - RG0 (id=1,2): category=alpha -> SELECT (via filter) +# - RG1 (id=3,4): category=beta -> SKIP (not in scan plan due to filter) +# - RG2 (id=5,6): category=gamma -> SELECT (via filter) +# - RG3 (id=7,8): category=delta -> SELECT (via filter) +# +# This creates a scenario where: +# 1. Row groups to scan: [0, 2, 3] (non-contiguous! RG1 is skipped) +# 2. Row selection exists for these row groups +# 3. When reversed, we need to correctly map selections to their row groups + +query TT +EXPLAIN SELECT * FROM non_contiguous_rg_parquet +WHERE category IN ('alpha', 'gamma', 'delta') +ORDER BY id DESC; +---- +logical_plan +01)Sort: non_contiguous_rg_parquet.id DESC NULLS FIRST +02)--Filter: non_contiguous_rg_parquet.category = Utf8View("alpha") OR non_contiguous_rg_parquet.category = Utf8View("gamma") OR non_contiguous_rg_parquet.category = Utf8View("delta") +03)----TableScan: non_contiguous_rg_parquet projection=[id, category, value], partial_filters=[non_contiguous_rg_parquet.category = Utf8View("alpha") OR non_contiguous_rg_parquet.category = Utf8View("gamma") OR non_contiguous_rg_parquet.category = Utf8View("delta")] +physical_plan +01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/non_contiguous_rg.parquet]]}, projection=[id, category, value], file_type=parquet, predicate=category@1 = alpha OR category@1 = gamma OR category@1 = delta, reverse_row_groups=true, pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1 OR category_null_count@2 != row_count@3 AND category_min@0 <= gamma AND gamma <= category_max@1 OR category_null_count@2 != row_count@3 AND category_min@0 <= delta AND delta <= category_max@1, required_guarantees=[category in (alpha, delta, gamma)] + +# Verify the results are correct with non-contiguous row groups +# Expected order (DESC): delta rows (8,7), gamma rows (6,5), alpha rows (2,1) +query ITI +SELECT * FROM non_contiguous_rg_parquet +WHERE category IN ('alpha', 'gamma', 'delta') +ORDER BY id DESC; +---- +8 delta 80 +7 delta 70 +6 gamma 60 +5 gamma 50 +2 alpha 20 +1 alpha 10 + +# Test another pattern: skip first and last row groups +# Only scan RG1 and RG2 +query ITI +SELECT * FROM non_contiguous_rg_parquet +WHERE category IN ('beta', 'gamma') +ORDER BY id DESC; +---- +6 gamma 60 +5 gamma 50 +4 beta 40 +3 beta 30 + +# Test forward scan for comparison (should give same results in ASC order) +query ITI +SELECT * FROM non_contiguous_rg_parquet +WHERE category IN ('alpha', 'gamma', 'delta') +ORDER BY id ASC; +---- +1 alpha 10 +2 alpha 20 +5 gamma 50 +6 gamma 60 +7 delta 70 +8 delta 80 + +# More complex case: Create a filter that results in partial row selection within some row groups +# This will test the row selection reversal logic more thoroughly +statement ok +CREATE TABLE complex_rg_data(id INT, category VARCHAR, subcategory VARCHAR, value INT) AS VALUES +-- RG0 +(1, 'A', 'x', 100), +(2, 'A', 'y', 200), +-- RG1 (will be completely skipped by filter) +(3, 'B', 'x', 300), +(4, 'B', 'y', 400), +-- RG2 (partial selection: only first row matches) +(5, 'C', 'x', 500), +(6, 'C', 'z', 600), +-- RG3 +(7, 'D', 'x', 700), +(8, 'D', 'y', 800); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 2; + +query I +COPY (SELECT * FROM complex_rg_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/complex_rg.parquet'; +---- +8 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE complex_rg_parquet(id INT, category VARCHAR, subcategory VARCHAR, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/complex_rg.parquet' +WITH ORDER (id ASC); + +# Filter that creates partial row selection: +# - RG0: both rows match (category A, subcategory x or y) +# - RG1: no rows match (category B is excluded) +# - RG2: only first row matches (category C AND subcategory x) +# - RG3: both rows match (category D, subcategory x or y) +# +# Row groups scanned: [0, 2, 3] (non-contiguous) +# Row selection for RG2: select(1), skip(1) +query ITTI +SELECT * FROM complex_rg_parquet +WHERE category IN ('A', 'C', 'D') AND subcategory IN ('x', 'y') +ORDER BY id DESC; +---- +8 D y 800 +7 D x 700 +5 C x 500 +2 A y 200 +1 A x 100 + +# Verify forward scan gives same results in ASC order +query ITTI +SELECT * FROM complex_rg_parquet +WHERE category IN ('A', 'C', 'D') AND subcategory IN ('x', 'y') +ORDER BY id ASC; +---- +1 A x 100 +2 A y 200 +5 C x 500 +7 D x 700 +8 D y 800 + +# Cleanup +statement ok +DROP TABLE non_contiguous_rg_data; + +statement ok +DROP TABLE non_contiguous_rg_parquet; + +statement ok +DROP TABLE complex_rg_data; + +statement ok +DROP TABLE complex_rg_parquet; \ No newline at end of file From 66db91284b450ac03928de47c59025865e26ab41 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 14:14:09 +0800 Subject: [PATCH 04/10] Address review comments --- datafusion/datasource-parquet/src/sort.rs | 67 +++++++++++++++-------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index 96c6256d793f1..b5cdf45641d14 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -52,8 +52,14 @@ pub fn reverse_row_selection( // Build a mapping of row group index to its row range, but ONLY for // the row groups that are actually being scanned. - // The row numbers in this mapping are relative to the scanned row groups, - // not the entire file. + // + // IMPORTANT: The row numbers in this mapping are RELATIVE to the scanned row groups, + // not absolute positions in the file. + // + // Example: If row_groups_to_scan = [0, 2, 3] and each has 100 rows: + // RG0: rows 0-99 (relative to scanned data) + // RG2: rows 100-199 (relative to scanned data, NOT 200-299 in file!) + // RG3: rows 200-299 (relative to scanned data, NOT 300-399 in file!) let mut rg_row_ranges: Vec<(usize, usize, usize)> = Vec::with_capacity(row_groups_to_scan.len()); let mut current_row = 0; @@ -61,7 +67,7 @@ pub fn reverse_row_selection( let rg = &rg_metadata[rg_idx]; let num_rows = rg.num_rows() as usize; rg_row_ranges.push((rg_idx, current_row, current_row + num_rows)); - current_row += num_rows; + current_row += num_rows; // This is relative row number, NOT absolute file position } // Map selections to row groups @@ -131,9 +137,8 @@ mod tests { // Create in-memory parquet file with the specified row groups let mut buffer = Vec::new(); { - let props = parquet::file::properties::WriterProperties::builder() - .set_max_row_group_size(row_group_sizes[0] as usize) - .build(); + // Don't set max_row_group_size - we'll control it by writing separate batches + let props = parquet::file::properties::WriterProperties::builder().build(); let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); @@ -147,6 +152,8 @@ mod tests { ) .unwrap(); writer.write(&batch).unwrap(); + // Force flush to create a new row group + writer.flush().unwrap(); } writer.close().unwrap(); } @@ -157,6 +164,20 @@ mod tests { reader.metadata().clone() } + /// Test helper: Reverse a row selection for given row groups + /// + /// This helper makes tests more readable by clearly showing: + /// - Which row groups are being scanned + /// - What the original selection is + /// - What the reversed selection should be + fn reverse_access_plan( + row_selection: RowSelection, + metadata: &ParquetMetaData, + row_groups_to_scan: &[usize], + ) -> RowSelection { + reverse_row_selection(&row_selection, metadata, row_groups_to_scan).unwrap() + } + #[test] fn test_reverse_simple_selection() { // 3 row groups with 100 rows each @@ -166,10 +187,11 @@ mod tests { let selection = RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(250)]); + // Scanning all 3 row groups let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); // Verify total selected rows remain the same let original_selected: usize = selection @@ -199,7 +221,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); // Verify total selected rows remain the same let original_selected: usize = selection @@ -225,8 +247,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; - let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + let reversed = reverse_access_plan(selection, &metadata, &row_groups_to_scan); // Should still select all rows, just in reversed row group order let total_selected: usize = reversed @@ -247,8 +268,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; - let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + let reversed = reverse_access_plan(selection, &metadata, &row_groups_to_scan); // Should still skip all rows let total_selected: usize = reversed @@ -273,7 +293,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); let original_selected: usize = selection .iter() @@ -296,10 +316,11 @@ mod tests { let selection = RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]); + // Only scanning the single row group (RG0) let row_groups_to_scan = vec![0]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); // With single row group, selection should remain the same let original_selected: usize = selection @@ -332,7 +353,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); let original_selected: usize = selection .iter() @@ -351,7 +372,7 @@ mod tests { #[test] fn test_reverse_with_skipped_row_group() { - // This test covers the "no specific selection" code path (lines 90-95) + // This test covers the "no specific selection" code path let metadata = create_test_metadata(vec![100, 100, 100]); // Select only from first and third row groups, skip middle one entirely @@ -364,7 +385,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); // Verify total selected rows remain the same let original_selected: usize = selection @@ -397,7 +418,7 @@ mod tests { let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); let original_selected: usize = selection .iter() @@ -417,20 +438,22 @@ mod tests { #[test] fn test_reverse_alternating_row_groups() { // Test with more complex skipping pattern + // File has 4 row groups, but we only scan first 3 let metadata = create_test_metadata(vec![100, 100, 100, 100]); - // Select first and third row groups, skip second and fourth + // Select first and third row groups, skip second + // Note: Selection only covers first 3 row groups (300 rows) let selection = RowSelection::from(vec![ RowSelector::select(100), // RG0 RowSelector::skip(100), // RG1 RowSelector::select(100), // RG2 - RowSelector::skip(100), // RG3 ]); + // Only scanning first 3 row groups let row_groups_to_scan = vec![0, 1, 2]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); let original_selected: usize = selection .iter() @@ -464,7 +487,7 @@ mod tests { // Only scanning RG0, RG2, RG3 (RG1 is not in the scan plan) let row_groups_to_scan = vec![0, 2, 3]; let reversed = - reverse_row_selection(&selection, &metadata, &row_groups_to_scan).unwrap(); + reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); // Verify total selected rows remain the same let original_selected: usize = selection From 6ada9ddd1d2f45f5c5ff94ca46ab21677ba5420e Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 14:37:36 +0800 Subject: [PATCH 05/10] remove slt test --- .../sqllogictest/test_files/sort_pushdown.slt | 185 ------------------ 1 file changed, 185 deletions(-) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 4f85cfadaa1da..58d9915a24be2 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -884,188 +884,3 @@ DROP TABLE signed_parquet; statement ok SET datafusion.optimizer.enable_sort_pushdown = true; - -# Test 1.8: Reverse scan with non-contiguous row groups and row selection -# This is the key bug fix scenario: when some row groups are skipped (not in scan plan), -# the row selection must still be correctly reversed - -# Create data with 4 row groups, but we'll only scan 3 of them (skipping RG1) -statement ok -CREATE TABLE non_contiguous_rg_data(id INT, category VARCHAR, value INT) AS VALUES -(1, 'alpha', 10), -(2, 'alpha', 20), -(3, 'beta', 30), -(4, 'beta', 40), -(5, 'gamma', 50), -(6, 'gamma', 60), -(7, 'delta', 70), -(8, 'delta', 80); - -# Write with small row groups (2 rows each = 4 row groups: RG0, RG1, RG2, RG3) -statement ok -SET datafusion.execution.parquet.max_row_group_size = 2; - -query I -COPY (SELECT * FROM non_contiguous_rg_data ORDER BY id ASC) -TO 'test_files/scratch/sort_pushdown/non_contiguous_rg.parquet'; ----- -8 - -# Reset row group size -statement ok -SET datafusion.execution.parquet.max_row_group_size = 1048576; - -statement ok -CREATE EXTERNAL TABLE non_contiguous_rg_parquet(id INT, category VARCHAR, value INT) -STORED AS PARQUET -LOCATION 'test_files/scratch/sort_pushdown/non_contiguous_rg.parquet' -WITH ORDER (id ASC); - -# Enable page index and pushdown filters to create row selection -statement ok -SET datafusion.execution.parquet.enable_page_index = true; - -statement ok -SET datafusion.execution.parquet.pushdown_filters = true; - -# Test case: Filter that skips RG1 entirely (beta category) -# - RG0 (id=1,2): category=alpha -> SELECT (via filter) -# - RG1 (id=3,4): category=beta -> SKIP (not in scan plan due to filter) -# - RG2 (id=5,6): category=gamma -> SELECT (via filter) -# - RG3 (id=7,8): category=delta -> SELECT (via filter) -# -# This creates a scenario where: -# 1. Row groups to scan: [0, 2, 3] (non-contiguous! RG1 is skipped) -# 2. Row selection exists for these row groups -# 3. When reversed, we need to correctly map selections to their row groups - -query TT -EXPLAIN SELECT * FROM non_contiguous_rg_parquet -WHERE category IN ('alpha', 'gamma', 'delta') -ORDER BY id DESC; ----- -logical_plan -01)Sort: non_contiguous_rg_parquet.id DESC NULLS FIRST -02)--Filter: non_contiguous_rg_parquet.category = Utf8View("alpha") OR non_contiguous_rg_parquet.category = Utf8View("gamma") OR non_contiguous_rg_parquet.category = Utf8View("delta") -03)----TableScan: non_contiguous_rg_parquet projection=[id, category, value], partial_filters=[non_contiguous_rg_parquet.category = Utf8View("alpha") OR non_contiguous_rg_parquet.category = Utf8View("gamma") OR non_contiguous_rg_parquet.category = Utf8View("delta")] -physical_plan -01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/non_contiguous_rg.parquet]]}, projection=[id, category, value], file_type=parquet, predicate=category@1 = alpha OR category@1 = gamma OR category@1 = delta, reverse_row_groups=true, pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1 OR category_null_count@2 != row_count@3 AND category_min@0 <= gamma AND gamma <= category_max@1 OR category_null_count@2 != row_count@3 AND category_min@0 <= delta AND delta <= category_max@1, required_guarantees=[category in (alpha, delta, gamma)] - -# Verify the results are correct with non-contiguous row groups -# Expected order (DESC): delta rows (8,7), gamma rows (6,5), alpha rows (2,1) -query ITI -SELECT * FROM non_contiguous_rg_parquet -WHERE category IN ('alpha', 'gamma', 'delta') -ORDER BY id DESC; ----- -8 delta 80 -7 delta 70 -6 gamma 60 -5 gamma 50 -2 alpha 20 -1 alpha 10 - -# Test another pattern: skip first and last row groups -# Only scan RG1 and RG2 -query ITI -SELECT * FROM non_contiguous_rg_parquet -WHERE category IN ('beta', 'gamma') -ORDER BY id DESC; ----- -6 gamma 60 -5 gamma 50 -4 beta 40 -3 beta 30 - -# Test forward scan for comparison (should give same results in ASC order) -query ITI -SELECT * FROM non_contiguous_rg_parquet -WHERE category IN ('alpha', 'gamma', 'delta') -ORDER BY id ASC; ----- -1 alpha 10 -2 alpha 20 -5 gamma 50 -6 gamma 60 -7 delta 70 -8 delta 80 - -# More complex case: Create a filter that results in partial row selection within some row groups -# This will test the row selection reversal logic more thoroughly -statement ok -CREATE TABLE complex_rg_data(id INT, category VARCHAR, subcategory VARCHAR, value INT) AS VALUES --- RG0 -(1, 'A', 'x', 100), -(2, 'A', 'y', 200), --- RG1 (will be completely skipped by filter) -(3, 'B', 'x', 300), -(4, 'B', 'y', 400), --- RG2 (partial selection: only first row matches) -(5, 'C', 'x', 500), -(6, 'C', 'z', 600), --- RG3 -(7, 'D', 'x', 700), -(8, 'D', 'y', 800); - -statement ok -SET datafusion.execution.parquet.max_row_group_size = 2; - -query I -COPY (SELECT * FROM complex_rg_data ORDER BY id ASC) -TO 'test_files/scratch/sort_pushdown/complex_rg.parquet'; ----- -8 - -statement ok -SET datafusion.execution.parquet.max_row_group_size = 1048576; - -statement ok -CREATE EXTERNAL TABLE complex_rg_parquet(id INT, category VARCHAR, subcategory VARCHAR, value INT) -STORED AS PARQUET -LOCATION 'test_files/scratch/sort_pushdown/complex_rg.parquet' -WITH ORDER (id ASC); - -# Filter that creates partial row selection: -# - RG0: both rows match (category A, subcategory x or y) -# - RG1: no rows match (category B is excluded) -# - RG2: only first row matches (category C AND subcategory x) -# - RG3: both rows match (category D, subcategory x or y) -# -# Row groups scanned: [0, 2, 3] (non-contiguous) -# Row selection for RG2: select(1), skip(1) -query ITTI -SELECT * FROM complex_rg_parquet -WHERE category IN ('A', 'C', 'D') AND subcategory IN ('x', 'y') -ORDER BY id DESC; ----- -8 D y 800 -7 D x 700 -5 C x 500 -2 A y 200 -1 A x 100 - -# Verify forward scan gives same results in ASC order -query ITTI -SELECT * FROM complex_rg_parquet -WHERE category IN ('A', 'C', 'D') AND subcategory IN ('x', 'y') -ORDER BY id ASC; ----- -1 A x 100 -2 A y 200 -5 C x 500 -7 D x 700 -8 D y 800 - -# Cleanup -statement ok -DROP TABLE non_contiguous_rg_data; - -statement ok -DROP TABLE non_contiguous_rg_parquet; - -statement ok -DROP TABLE complex_rg_data; - -statement ok -DROP TABLE complex_rg_parquet; \ No newline at end of file From 033ac62329373df85772547eb34fda38539e1d35 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 14:46:11 +0800 Subject: [PATCH 06/10] Add accurate test --- datafusion/datasource-parquet/src/opener.rs | 99 ++++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c53e92e530404..a983caa376104 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -970,7 +970,7 @@ mod test { use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener}; + use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener, RowGroupAccess}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ @@ -1857,4 +1857,101 @@ mod test { "Reverse scan should reverse row group order while maintaining correct RowSelection for each group" ); } + + #[tokio::test] + async fn test_reverse_scan_with_non_contiguous_row_groups() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + // Create 4 batches (4 row groups) + let batch0 = record_batch!(("a", Int32, vec![Some(1), Some(2)])).unwrap(); + let batch1 = record_batch!(("a", Int32, vec![Some(3), Some(4)])).unwrap(); + let batch2 = record_batch!(("a", Int32, vec![Some(5), Some(6)])).unwrap(); + let batch3 = record_batch!(("a", Int32, vec![Some(7), Some(8)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(2) + .build(); + + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch0.clone(), batch1, batch2, batch3], + Some(props), + ) + .await; + + let schema = batch0.schema(); + + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + // KEY: Skip RG1 (non-contiguous!) + // Only scan row groups: [0, 2, 3] + let mut access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, // RG0 + RowGroupAccess::Skip, // RG1 - SKIPPED! + RowGroupAccess::Scan, // RG2 + RowGroupAccess::Scan, // RG3 + ]); + + // Add RowSelection for each scanned row group + // RG0: select first row (1), skip second (2) + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]), + ); + // RG1: skipped, no selection needed + // RG2: select first row (5), skip second (6) + access_plan.scan_selection( + 2, + RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]), + ); + // RG3: select first row (7), skip second (8) + access_plan.scan_selection( + 3, + RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]), + ); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let make_opener = |reverse_scan: bool| { + ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(reverse_scan) + .build() + }; + + // Forward scan: RG0(1), RG2(5), RG3(7) + // Note: RG1 is completely skipped + let opener = make_opener(false); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let forward_values = collect_int32_values(stream).await; + + assert_eq!( + forward_values, + vec![1, 5, 7], + "Forward scan with non-contiguous row groups" + ); + + // Reverse scan: RG3(7), RG2(5), RG0(1) + // WITHOUT the bug fix, this would return WRONG values + // because the RowSelection would be incorrectly mapped + let opener = make_opener(true); + let stream = opener.open(file).unwrap().await.unwrap(); + let reverse_values = collect_int32_values(stream).await; + + assert_eq!( + reverse_values, + vec![7, 5, 1], + "Reverse scan with non-contiguous row groups should correctly map RowSelection" + ); + } } From fb6b8360fa8996b3b3a8505ca91972da16baacef Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 14:47:13 +0800 Subject: [PATCH 07/10] Add accurate test --- datafusion/datasource-parquet/src/opener.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index a983caa376104..694087326eaee 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -970,7 +970,7 @@ mod test { use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener, RowGroupAccess}; + use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ @@ -1880,7 +1880,7 @@ mod test { vec![batch0.clone(), batch1, batch2, batch3], Some(props), ) - .await; + .await; let schema = batch0.schema(); @@ -1890,10 +1890,10 @@ mod test { // KEY: Skip RG1 (non-contiguous!) // Only scan row groups: [0, 2, 3] let mut access_plan = ParquetAccessPlan::new(vec![ - RowGroupAccess::Scan, // RG0 - RowGroupAccess::Skip, // RG1 - SKIPPED! - RowGroupAccess::Scan, // RG2 - RowGroupAccess::Scan, // RG3 + RowGroupAccess::Scan, // RG0 + RowGroupAccess::Skip, // RG1 - SKIPPED! + RowGroupAccess::Scan, // RG2 + RowGroupAccess::Scan, // RG3 ]); // Add RowSelection for each scanned row group @@ -1918,7 +1918,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extensions(Arc::new(access_plan)); + .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { ParquetOpenerBuilder::new() From 3c14c828c3d6ec8bdf372cffe43764f7e1cb2569 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 22:12:21 +0800 Subject: [PATCH 08/10] Use 100% real case to test --- datafusion/datasource-parquet/src/opener.rs | 10 +- datafusion/datasource-parquet/src/sort.rs | 913 +++++++++++++++----- 2 files changed, 716 insertions(+), 207 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 694087326eaee..83bdf79c8fcc0 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -121,16 +121,16 @@ pub(super) struct ParquetOpener { } /// Represents a prepared access plan with optional row selection -struct PreparedAccessPlan { +pub(crate) struct PreparedAccessPlan { /// Row group indexes to read - row_group_indexes: Vec, + pub(crate) row_group_indexes: Vec, /// Optional row selection for filtering within row groups - row_selection: Option, + pub(crate) row_selection: Option, } impl PreparedAccessPlan { /// Create a new prepared access plan from a ParquetAccessPlan - fn from_access_plan( + pub(crate) fn from_access_plan( access_plan: ParquetAccessPlan, rg_metadata: &[RowGroupMetaData], ) -> Result { @@ -144,7 +144,7 @@ impl PreparedAccessPlan { } /// Reverse the access plan for reverse scanning - fn reverse( + pub(crate) fn reverse( mut self, file_metadata: &parquet::file::metadata::ParquetMetaData, ) -> Result { diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index b5cdf45641d14..4be2672133ce8 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -120,9 +120,12 @@ pub fn reverse_row_selection( #[cfg(test)] mod tests { - use super::*; + use crate::opener::PreparedAccessPlan; + use crate::ParquetAccessPlan; + use crate::RowGroupAccess; use arrow::datatypes::{DataType, Field, Schema}; use bytes::Bytes; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::ArrowWriter; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; @@ -130,106 +133,156 @@ mod tests { /// Helper function to create a ParquetMetaData with specified row group sizes /// by actually writing a parquet file in memory - fn create_test_metadata(row_group_sizes: Vec) -> ParquetMetaData { - // Create a simple schema + fn create_test_metadata(row_group_sizes: Vec) -> parquet::file::metadata::ParquetMetaData { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - - // Create in-memory parquet file with the specified row groups let mut buffer = Vec::new(); { - // Don't set max_row_group_size - we'll control it by writing separate batches let props = parquet::file::properties::WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); for &size in &row_group_sizes { - // Create a batch with the specified number of rows let array = arrow::array::Int32Array::from(vec![1; size as usize]); let batch = arrow::record_batch::RecordBatch::try_new( schema.clone(), vec![Arc::new(array)], ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); - // Force flush to create a new row group writer.flush().unwrap(); } writer.close().unwrap(); } - // Read back the metadata let bytes = Bytes::from(buffer); let reader = SerializedFileReader::new(bytes).unwrap(); reader.metadata().clone() } - /// Test helper: Reverse a row selection for given row groups - /// - /// This helper makes tests more readable by clearly showing: - /// - Which row groups are being scanned - /// - What the original selection is - /// - What the reversed selection should be - fn reverse_access_plan( - row_selection: RowSelection, - metadata: &ParquetMetaData, - row_groups_to_scan: &[usize], - ) -> RowSelection { - reverse_row_selection(&row_selection, metadata, row_groups_to_scan).unwrap() - } - #[test] - fn test_reverse_simple_selection() { - // 3 row groups with 100 rows each + fn test_prepared_access_plan_reverse_simple() { + // Test: all row groups are scanned, no row selection let metadata = create_test_metadata(vec![100, 100, 100]); - // Select first 50 rows from first row group - let selection = - RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(250)]); + let access_plan = ParquetAccessPlan::new_all(3); + let rg_metadata = metadata.row_groups(); + + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + // Verify original plan + assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]); - // Scanning all 3 row groups - let row_groups_to_scan = vec![0, 1, 2]; + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); + // Verify row groups are reversed + assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]); - // Verify total selected rows remain the same - let original_selected: usize = selection + // If no selection originally, after reversal should still select all rows + if let Some(selection) = reversed_plan.row_selection { + let total_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + assert_eq!(total_selected, 300); + } + } + + #[test] + fn test_prepared_access_plan_reverse_with_selection() { + // Test: simple row selection that spans multiple row groups + let metadata = create_test_metadata(vec![100, 100, 100]); + + let mut access_plan = ParquetAccessPlan::new_all(3); + + // Select first 50 rows from first row group, skip rest + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - assert_eq!(original_selected, reversed_selected); + assert_eq!( + original_selected, reversed_selected, + "Total selected rows should remain the same" + ); } #[test] - fn test_reverse_multi_row_group_selection() { + fn test_prepared_access_plan_reverse_multi_row_group_selection() { + // Test: row selection spanning multiple row groups let metadata = create_test_metadata(vec![100, 100, 100]); - // Select rows spanning multiple row groups - let selection = RowSelection::from(vec![ - RowSelector::skip(50), - RowSelector::select(100), // Spans RG0 and RG1 - RowSelector::skip(150), - ]); - - let row_groups_to_scan = vec![0, 1, 2]; - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); - - // Verify total selected rows remain the same - let original_selected: usize = selection + let mut access_plan = ParquetAccessPlan::new_all(3); + + // Create selection that spans RG0 and RG1 + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::skip(50), + RowSelector::select(50), + ]), + ); + access_plan.scan_selection( + 1, + RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) @@ -239,193 +292,413 @@ mod tests { } #[test] - fn test_reverse_full_selection() { + fn test_prepared_access_plan_reverse_empty_selection() { + // Test: all rows are skipped let metadata = create_test_metadata(vec![100, 100, 100]); - // Select all rows - let selection = RowSelection::from(vec![RowSelector::select(300)]); + let mut access_plan = ParquetAccessPlan::new_all(3); + + // Skip all rows in all row groups + for i in 0..3 { + access_plan.scan_selection( + i, + RowSelection::from(vec![RowSelector::skip(100)]), + ); + } - let row_groups_to_scan = vec![0, 1, 2]; + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); - let reversed = reverse_access_plan(selection, &metadata, &row_groups_to_scan); + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); - // Should still select all rows, just in reversed row group order - let total_selected: usize = reversed + // Should still skip all rows + let total_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - assert_eq!(total_selected, 300); + assert_eq!(total_selected, 0); } #[test] - fn test_reverse_empty_selection() { - let metadata = create_test_metadata(vec![100, 100, 100]); - - // Skip all rows - let selection = RowSelection::from(vec![RowSelector::skip(300)]); + fn test_prepared_access_plan_reverse_different_row_group_sizes() { + // Test: row groups with different sizes + let metadata = create_test_metadata(vec![50, 150, 100]); - let row_groups_to_scan = vec![0, 1, 2]; + let mut access_plan = ParquetAccessPlan::new_all(3); + + // Create complex selection pattern + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::skip(25), + RowSelector::select(25), + ]), + ); + access_plan.scan_selection( + 1, + RowSelection::from(vec![ + RowSelector::select(150), + ]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); - let reversed = reverse_access_plan(selection, &metadata, &row_groups_to_scan); + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); - // Should still skip all rows - let total_selected: usize = reversed + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - assert_eq!(total_selected, 0); + assert_eq!(original_selected, reversed_selected); } #[test] - fn test_reverse_with_different_row_group_sizes() { - let metadata = create_test_metadata(vec![50, 150, 100]); - - let selection = RowSelection::from(vec![ - RowSelector::skip(25), - RowSelector::select(200), // Spans all row groups - RowSelector::skip(75), - ]); - - let row_groups_to_scan = vec![0, 1, 2]; - - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); + fn test_prepared_access_plan_reverse_single_row_group() { + // Test: single row group case + let metadata = create_test_metadata(vec![100]); - let original_selected: usize = selection + let mut access_plan = ParquetAccessPlan::new_all(1); + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // With single row group, row_group_indexes should remain [0] + assert_eq!(reversed_plan.row_group_indexes, vec![0]); + + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); assert_eq!(original_selected, reversed_selected); + assert_eq!(original_selected, 50); } #[test] - fn test_reverse_single_row_group() { - let metadata = create_test_metadata(vec![100]); - - let selection = - RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]); - - // Only scanning the single row group (RG0) - let row_groups_to_scan = vec![0]; + fn test_prepared_access_plan_reverse_complex_pattern() { + // Test: complex pattern with multiple select/skip segments + let metadata = create_test_metadata(vec![100, 100, 100]); - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); + let mut access_plan = ParquetAccessPlan::new_all(3); - // With single row group, selection should remain the same - let original_selected: usize = selection + // Complex pattern: select some, skip some, select some more + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::select(30), + RowSelector::skip(40), + RowSelector::select(30), + ]), + ); + access_plan.scan_selection( + 1, + RowSelection::from(vec![ + RowSelector::skip(50), + RowSelector::select(50), + ]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![ + RowSelector::select(100), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); assert_eq!(original_selected, reversed_selected); + assert_eq!(original_selected, 210); // 30 + 30 + 50 + 100 } #[test] - fn test_reverse_complex_pattern() { - let metadata = create_test_metadata(vec![100, 100, 100]); + fn test_prepared_access_plan_reverse_with_skipped_row_groups() { + // This is the KEY test case for the bug fix! + // Test scenario where some row groups are completely skipped (not in scan plan) + let metadata = create_test_metadata(vec![100, 100, 100, 100]); - // Complex pattern: select some, skip some, select some more - let selection = RowSelection::from(vec![ - RowSelector::select(30), - RowSelector::skip(40), - RowSelector::select(80), - RowSelector::skip(50), - RowSelector::select(100), + // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial), RG3 (scan all) + // Only row groups [0, 2, 3] are in the scan plan + let mut access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, // RG0 + RowGroupAccess::Skip, // RG1 - NOT in scan plan! + RowGroupAccess::Scan, // RG2 + RowGroupAccess::Scan, // RG3 ]); - let row_groups_to_scan = vec![0, 1, 2]; - - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); - - let original_selected: usize = selection + // Add row selections for the scanned row groups + // Note: The RowSelection only covers row groups [0, 2, 3] (300 rows total) + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::select(100)]), // RG0: all 100 rows + ); + // RG1 is skipped, no selection needed + access_plan.scan_selection( + 2, + RowSelection::from(vec![ + RowSelector::select(25), // RG2: first 25 rows + RowSelector::skip(75), // RG2: skip last 75 rows + ]), + ); + access_plan.scan_selection( + 3, + RowSelection::from(vec![RowSelector::select(100)]), // RG3: all 100 rows + ); + + let rg_metadata = metadata.row_groups(); + + // Step 1: Create PreparedAccessPlan + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + // Verify original plan + assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]); + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + assert_eq!(original_selected, 225); // 100 + 25 + 100 + + // Step 2: Reverse the plan (this is the production code path) + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // Verify reversed results + // Row group order should be reversed: [3, 2, 0] + assert_eq!( + reversed_plan.row_group_indexes, + vec![3, 2, 0], + "Row groups should be reversed" + ); + + // Verify row selection is also correctly reversed + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - assert_eq!(original_selected, reversed_selected); - assert_eq!(original_selected, 210); // 30 + 80 + 100 + assert_eq!( + reversed_selected, 225, + "Total selected rows should remain the same" + ); + + // Verify the reversed selection structure + // After reversal, the order becomes: RG3, RG2, RG0 + // - RG3: select(100) + // - RG2: select(25), skip(75) (note: internal order preserved, not reversed) + // - RG0: select(100) + // + // After RowSelection::from() merges adjacent selectors of the same type: + // - RG3's select(100) + RG2's select(25) = select(125) + // - RG2's skip(75) remains as skip(75) + // - RG0's select(100) remains as select(100) + let selectors: Vec<_> = reversed_plan.row_selection.as_ref().unwrap().iter().collect(); + assert_eq!(selectors.len(), 3); + + // RG3 (100) + RG2 first part (25) merged into select(125) + assert!(!selectors[0].skip); + assert_eq!(selectors[0].row_count, 125); + + // RG2: skip last 75 rows + assert!(selectors[1].skip); + assert_eq!(selectors[1].row_count, 75); + + // RG0: select all 100 rows + assert!(!selectors[2].skip); + assert_eq!(selectors[2].row_count, 100); } #[test] - fn test_reverse_with_skipped_row_group() { - // This test covers the "no specific selection" code path - let metadata = create_test_metadata(vec![100, 100, 100]); + fn test_prepared_access_plan_reverse_alternating_row_groups() { + // Test with alternating scan/skip pattern + let metadata = create_test_metadata(vec![100, 100, 100, 100]); - // Select only from first and third row groups, skip middle one entirely - let selection = RowSelection::from(vec![ - RowSelector::select(50), // First 50 of RG0 - RowSelector::skip(150), // Rest of RG0 + all of RG1 + half of RG2 - RowSelector::select(50), // Last 50 of RG2 + // Scan RG0 and RG2, skip RG1 and RG3 + let mut access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, // RG0 + RowGroupAccess::Skip, // RG1 + RowGroupAccess::Scan, // RG2 + RowGroupAccess::Skip, // RG3 ]); - let row_groups_to_scan = vec![0, 1, 2]; - - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); - - // Verify total selected rows remain the same - let original_selected: usize = selection + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::select(100)]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![RowSelector::select(100)]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + // Original: [0, 2] + assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]); + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // After reverse: [2, 0] + assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]); + + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); assert_eq!(original_selected, reversed_selected); - assert_eq!(original_selected, 100); // 50 + 50 + assert_eq!(original_selected, 200); } #[test] - fn test_reverse_middle_row_group_only() { - // Another test to ensure skipped row groups are handled correctly + fn test_prepared_access_plan_reverse_middle_row_group_only() { + // Test selecting only the middle row group let metadata = create_test_metadata(vec![100, 100, 100]); - // Select only middle row group - let selection = RowSelection::from(vec![ - RowSelector::skip(100), // Skip RG0 - RowSelector::select(100), // Select all of RG1 - RowSelector::skip(100), // Skip RG2 + let mut access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Skip, // RG0 + RowGroupAccess::Scan, // RG1 + RowGroupAccess::Skip, // RG2 ]); - let row_groups_to_scan = vec![0, 1, 2]; + access_plan.scan_selection( + 1, + RowSelection::from(vec![RowSelector::select(100)]), // Select all of RG1 + ); - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); - let original_selected: usize = selection + let original_selected: usize = prepared_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + // Original: [1] + assert_eq!(prepared_plan.row_group_indexes, vec![1]); + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // After reverse: still [1] (only one row group) + assert_eq!(reversed_plan.row_group_indexes, vec![1]); + + let reversed_selected: usize = reversed_plan + .row_selection + .as_ref() + .unwrap() .iter() .filter(|s| !s.skip) .map(|s| s.row_count) @@ -436,97 +709,333 @@ mod tests { } #[test] - fn test_reverse_alternating_row_groups() { - // Test with more complex skipping pattern - // File has 4 row groups, but we only scan first 3 + fn test_prepared_access_plan_reverse_with_skipped_row_groups_detailed() { + // This is the KEY test case for the bug fix! + // Test scenario where some row groups are completely skipped (not in scan plan) + // This version includes DETAILED verification of the selector distribution let metadata = create_test_metadata(vec![100, 100, 100, 100]); - // Select first and third row groups, skip second - // Note: Selection only covers first 3 row groups (300 rows) - let selection = RowSelection::from(vec![ - RowSelector::select(100), // RG0 - RowSelector::skip(100), // RG1 - RowSelector::select(100), // RG2 + // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial), RG3 (scan all) + // Only row groups [0, 2, 3] are in the scan plan + let mut access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, // RG0 + RowGroupAccess::Skip, // RG1 - NOT in scan plan! + RowGroupAccess::Scan, // RG2 + RowGroupAccess::Scan, // RG3 ]); - // Only scanning first 3 row groups - let row_groups_to_scan = vec![0, 1, 2]; + // Add row selections for the scanned row groups + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::select(100)]), // RG0: all 100 rows + ); + // RG1 is skipped, no selection needed + access_plan.scan_selection( + 2, + RowSelection::from(vec![ + RowSelector::select(25), // RG2: first 25 rows + RowSelector::skip(75), // RG2: skip last 75 rows + ]), + ); + access_plan.scan_selection( + 3, + RowSelection::from(vec![RowSelector::select(100)]), // RG3: all 100 rows + ); + + let rg_metadata = metadata.row_groups(); + + // Step 1: Create PreparedAccessPlan + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + // Verify original plan in detail + assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]); + + // Detailed verification of original selection + let orig_selectors: Vec<_> = prepared_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); + + // Original structure should be: + // RG0: select(100) + // RG2: select(25), skip(75) + // RG3: select(100) + // After merging by RowSelection::from(): select(125), skip(75), select(100) + assert_eq!(orig_selectors.len(), 3, "Original should have 3 selectors after merging"); + assert!(!orig_selectors[0].skip && orig_selectors[0].row_count == 125, + "Original: First selector should be select(125) from RG0(100) + RG2(25)"); + assert!(orig_selectors[1].skip && orig_selectors[1].row_count == 75, + "Original: Second selector should be skip(75) from RG2"); + assert!(!orig_selectors[2].skip && orig_selectors[2].row_count == 100, + "Original: Third selector should be select(100) from RG3"); + + let original_selected: usize = orig_selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + assert_eq!(original_selected, 225); // 100 + 25 + 100 + + // Step 2: Reverse the plan (this is the production code path) + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // Verify reversed results + // Row group order should be reversed: [3, 2, 0] + assert_eq!( + reversed_plan.row_group_indexes, + vec![3, 2, 0], + "Row groups should be reversed" + ); + + // Detailed verification of reversed selection + let rev_selectors: Vec<_> = reversed_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); + // After reversal, the order becomes: RG3, RG2, RG0 + // - RG3: select(100) + // - RG2: select(25), skip(75) (note: internal order preserved, not reversed) + // - RG0: select(100) + // + // After RowSelection::from() merges adjacent selectors of the same type: + // - RG3's select(100) + RG2's select(25) = select(125) + // - RG2's skip(75) remains as skip(75) + // - RG0's select(100) remains as select(100) - let original_selected: usize = selection + assert_eq!(rev_selectors.len(), 3, "Reversed should have 3 selectors after merging"); + + // First selector: RG3 (100) + RG2 first part (25) merged into select(125) + assert!( + !rev_selectors[0].skip && rev_selectors[0].row_count == 125, + "Reversed: First selector should be select(125) from RG3(100) + RG2(25), got skip={} count={}", + rev_selectors[0].skip, rev_selectors[0].row_count + ); + + // Second selector: RG2 skip last 75 rows + assert!( + rev_selectors[1].skip && rev_selectors[1].row_count == 75, + "Reversed: Second selector should be skip(75) from RG2, got skip={} count={}", + rev_selectors[1].skip, rev_selectors[1].row_count + ); + + // Third selector: RG0 select all 100 rows + assert!( + !rev_selectors[2].skip && rev_selectors[2].row_count == 100, + "Reversed: Third selector should be select(100) from RG0, got skip={} count={}", + rev_selectors[2].skip, rev_selectors[2].row_count + ); + + // Verify row selection is also correctly reversed (total count) + let reversed_selected: usize = rev_selectors .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + + assert_eq!( + reversed_selected, 225, + "Total selected rows should remain the same" + ); + } + + #[test] + fn test_prepared_access_plan_reverse_complex_pattern_detailed() { + // Test: complex pattern with detailed verification + let metadata = create_test_metadata(vec![100, 100, 100]); + + let mut access_plan = ParquetAccessPlan::new_all(3); + + // Complex pattern: select some, skip some, select some more + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::select(30), + RowSelector::skip(40), + RowSelector::select(30), + ]), + ); + access_plan.scan_selection( + 1, + RowSelection::from(vec![ + RowSelector::skip(50), + RowSelector::select(50), + ]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![ + RowSelector::select(100), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + // Verify original selection structure in detail + let orig_selectors: Vec<_> = prepared_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); + + // RG0: select(30), skip(40), select(30) + // RG1: skip(50), select(50) + // RG2: select(100) + // Sequential: sel(30), skip(40), sel(30), skip(50), sel(50), sel(100) + // After merge: sel(30), skip(40), sel(30), skip(50), sel(150) + + let original_selected: usize = orig_selectors .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); + assert_eq!(original_selected, 210); // 30 + 30 + 50 + 100 - assert_eq!(original_selected, reversed_selected); - assert_eq!(original_selected, 200); + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // Verify reversed selection structure + let rev_selectors: Vec<_> = reversed_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); + + // After reversal: RG2, RG1, RG0 + // RG2: select(100) + // RG1: skip(50), select(50) + // RG0: select(30), skip(40), select(30) + // Sequential: sel(100), skip(50), sel(50), sel(30), skip(40), sel(30) + // After merge: sel(100), skip(50), sel(80), skip(40), sel(30) + + let reversed_selected: usize = rev_selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!( + reversed_selected, 210, + "Total selected rows should remain the same (30 + 30 + 50 + 100)" + ); + + // Verify row group order + assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]); } #[test] - fn test_reverse_with_skipped_row_groups() { - // This is the key test case for the bug fix + fn test_prepared_access_plan_reverse_alternating_detailed() { + // Test with alternating scan/skip pattern with detailed verification let metadata = create_test_metadata(vec![100, 100, 100, 100]); - // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial), RG3 (scan all) - // The row selection only covers RG0, RG2, RG3 (300 rows total) - let selection = RowSelection::from(vec![ - RowSelector::select(100), // RG0: all 100 rows - RowSelector::select(25), // RG2: select first 25 rows - RowSelector::skip(75), // RG2: skip last 75 rows - RowSelector::select(100), // RG3: all 100 rows + // Scan RG0 and RG2, skip RG1 and RG3 + let mut access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, // RG0 + RowGroupAccess::Skip, // RG1 + RowGroupAccess::Scan, // RG2 + RowGroupAccess::Skip, // RG3 ]); - // Only scanning RG0, RG2, RG3 (RG1 is not in the scan plan) - let row_groups_to_scan = vec![0, 2, 3]; - let reversed = - reverse_access_plan(selection.clone(), &metadata, &row_groups_to_scan); + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::select(30), + RowSelector::skip(70), + ]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![ + RowSelector::skip(20), + RowSelector::select(80), + ]), + ); + + let rg_metadata = metadata.row_groups(); + let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); + + // Original: [0, 2] + assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]); + + // Verify original selection + let orig_selectors: Vec<_> = prepared_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); + + // Original: + // RG0: select(30), skip(70) + // RG2: skip(20), select(80) + // Sequential: sel(30), skip(90), sel(80) + // (RG0's skip(70) + RG2's skip(20) = skip(90)) - // Verify total selected rows remain the same - let original_selected: usize = selection + let original_selected: usize = orig_selectors .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - let reversed_selected: usize = reversed + assert_eq!(original_selected, 110); // 30 + 80 + + let reversed_plan = prepared_plan + .reverse(&metadata) + .expect("Failed to reverse PreparedAccessPlan"); + + // After reverse: [2, 0] + assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]); + + // Verify reversed selection + let rev_selectors: Vec<_> = reversed_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); + + // After reversal: RG2, RG0 + // RG2: skip(20), select(80) + // RG0: select(30), skip(70) + // Sequential: skip(20), sel(110), skip(70) + // (RG2's select(80) + RG0's select(30) = select(110)) + + let reversed_selected: usize = rev_selectors .iter() .filter(|s| !s.skip) .map(|s| s.row_count) .sum(); - assert_eq!(original_selected, 225); // 100 + 25 + 100 - assert_eq!(reversed_selected, 225); + assert_eq!(reversed_selected, 110); // Should still be 30 + 80 - // Verify the reversed selection structure - // After reversal, the order becomes: RG3, RG2, RG0 - // - RG3: select(100) - // - RG2: select(25), skip(75) (note: internal order preserved, not reversed) - // - RG0: select(100) - // - // After RowSelection::from() merges adjacent selectors of the same type: - // - RG3's select(100) + RG2's select(25) = select(125) - // - RG2's skip(75) remains as skip(75) - // - RG0's select(100) remains as select(100) - let selectors: Vec<_> = reversed.iter().collect(); - assert_eq!(selectors.len(), 3); + // Detailed verification of structure + assert_eq!(rev_selectors.len(), 3, "Reversed should have 3 selectors"); - // RG3 (100) + RG2 first part (25) merged into select(125) - assert!(!selectors[0].skip); - assert_eq!(selectors[0].row_count, 125); + assert!( + rev_selectors[0].skip && rev_selectors[0].row_count == 20, + "First selector should be skip(20) from RG2" + ); - // RG2: skip last 75 rows - assert!(selectors[1].skip); - assert_eq!(selectors[1].row_count, 75); + assert!( + !rev_selectors[1].skip && rev_selectors[1].row_count == 110, + "Second selector should be select(110) from RG2(80) + RG0(30)" + ); - // RG0: select all 100 rows - assert!(!selectors[2].skip); - assert_eq!(selectors[2].row_count, 100); + assert!( + rev_selectors[2].skip && rev_selectors[2].row_count == 70, + "Third selector should be skip(70) from RG0" + ); } } From 2144a7d26873945ac748bc3142cd066e7dfc6299 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 22:16:21 +0800 Subject: [PATCH 09/10] fmt --- datafusion/datasource-parquet/src/sort.rs | 216 ++++++++++------------ 1 file changed, 100 insertions(+), 116 deletions(-) diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index 4be2672133ce8..8845031556109 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -120,20 +120,22 @@ pub fn reverse_row_selection( #[cfg(test)] mod tests { - use crate::opener::PreparedAccessPlan; use crate::ParquetAccessPlan; use crate::RowGroupAccess; + use crate::opener::PreparedAccessPlan; use arrow::datatypes::{DataType, Field, Schema}; use bytes::Bytes; - use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::ArrowWriter; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; use std::sync::Arc; /// Helper function to create a ParquetMetaData with specified row group sizes /// by actually writing a parquet file in memory - fn create_test_metadata(row_group_sizes: Vec) -> parquet::file::metadata::ParquetMetaData { + fn create_test_metadata( + row_group_sizes: Vec, + ) -> parquet::file::metadata::ParquetMetaData { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); let mut buffer = Vec::new(); { @@ -147,7 +149,7 @@ mod tests { schema.clone(), vec![Arc::new(array)], ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); writer.flush().unwrap(); } @@ -167,8 +169,9 @@ mod tests { let access_plan = ParquetAccessPlan::new_all(3); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original plan assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]); @@ -201,15 +204,13 @@ mod tests { // Select first 50 rows from first row group, skip rest access_plan.scan_selection( 0, - RowSelection::from(vec![ - RowSelector::select(50), - RowSelector::skip(50), - ]), + RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]), ); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -249,22 +250,17 @@ mod tests { // Create selection that spans RG0 and RG1 access_plan.scan_selection( 0, - RowSelection::from(vec![ - RowSelector::skip(50), - RowSelector::select(50), - ]), + RowSelection::from(vec![RowSelector::skip(50), RowSelector::select(50)]), ); access_plan.scan_selection( 1, - RowSelection::from(vec![ - RowSelector::select(50), - RowSelector::skip(50), - ]), + RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]), ); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -300,15 +296,14 @@ mod tests { // Skip all rows in all row groups for i in 0..3 { - access_plan.scan_selection( - i, - RowSelection::from(vec![RowSelector::skip(100)]), - ); + access_plan + .scan_selection(i, RowSelection::from(vec![RowSelector::skip(100)])); } let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let reversed_plan = prepared_plan .reverse(&metadata) @@ -337,28 +332,18 @@ mod tests { // Create complex selection pattern access_plan.scan_selection( 0, - RowSelection::from(vec![ - RowSelector::skip(25), - RowSelector::select(25), - ]), - ); - access_plan.scan_selection( - 1, - RowSelection::from(vec![ - RowSelector::select(150), - ]), + RowSelection::from(vec![RowSelector::skip(25), RowSelector::select(25)]), ); + access_plan.scan_selection(1, RowSelection::from(vec![RowSelector::select(150)])); access_plan.scan_selection( 2, - RowSelection::from(vec![ - RowSelector::select(50), - RowSelector::skip(50), - ]), + RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]), ); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -393,15 +378,13 @@ mod tests { let mut access_plan = ParquetAccessPlan::new_all(1); access_plan.scan_selection( 0, - RowSelection::from(vec![ - RowSelector::select(50), - RowSelector::skip(50), - ]), + RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]), ); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -450,21 +433,14 @@ mod tests { ); access_plan.scan_selection( 1, - RowSelection::from(vec![ - RowSelector::skip(50), - RowSelector::select(50), - ]), - ); - access_plan.scan_selection( - 2, - RowSelection::from(vec![ - RowSelector::select(100), - ]), + RowSelection::from(vec![RowSelector::skip(50), RowSelector::select(50)]), ); + access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)])); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -517,8 +493,8 @@ mod tests { access_plan.scan_selection( 2, RowSelection::from(vec![ - RowSelector::select(25), // RG2: first 25 rows - RowSelector::skip(75), // RG2: skip last 75 rows + RowSelector::select(25), // RG2: first 25 rows + RowSelector::skip(75), // RG2: skip last 75 rows ]), ); access_plan.scan_selection( @@ -529,8 +505,9 @@ mod tests { let rg_metadata = metadata.row_groups(); // Step 1: Create PreparedAccessPlan - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original plan assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]); @@ -582,7 +559,12 @@ mod tests { // - RG3's select(100) + RG2's select(25) = select(125) // - RG2's skip(75) remains as skip(75) // - RG0's select(100) remains as select(100) - let selectors: Vec<_> = reversed_plan.row_selection.as_ref().unwrap().iter().collect(); + let selectors: Vec<_> = reversed_plan + .row_selection + .as_ref() + .unwrap() + .iter() + .collect(); assert_eq!(selectors.len(), 3); // RG3 (100) + RG2 first part (25) merged into select(125) @@ -611,18 +593,13 @@ mod tests { RowGroupAccess::Skip, // RG3 ]); - access_plan.scan_selection( - 0, - RowSelection::from(vec![RowSelector::select(100)]), - ); - access_plan.scan_selection( - 2, - RowSelection::from(vec![RowSelector::select(100)]), - ); + access_plan.scan_selection(0, RowSelection::from(vec![RowSelector::select(100)])); + access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)])); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -673,8 +650,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -733,8 +711,8 @@ mod tests { access_plan.scan_selection( 2, RowSelection::from(vec![ - RowSelector::select(25), // RG2: first 25 rows - RowSelector::skip(75), // RG2: skip last 75 rows + RowSelector::select(25), // RG2: first 25 rows + RowSelector::skip(75), // RG2: skip last 75 rows ]), ); access_plan.scan_selection( @@ -745,8 +723,9 @@ mod tests { let rg_metadata = metadata.row_groups(); // Step 1: Create PreparedAccessPlan - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original plan in detail assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]); @@ -764,13 +743,23 @@ mod tests { // RG2: select(25), skip(75) // RG3: select(100) // After merging by RowSelection::from(): select(125), skip(75), select(100) - assert_eq!(orig_selectors.len(), 3, "Original should have 3 selectors after merging"); - assert!(!orig_selectors[0].skip && orig_selectors[0].row_count == 125, - "Original: First selector should be select(125) from RG0(100) + RG2(25)"); - assert!(orig_selectors[1].skip && orig_selectors[1].row_count == 75, - "Original: Second selector should be skip(75) from RG2"); - assert!(!orig_selectors[2].skip && orig_selectors[2].row_count == 100, - "Original: Third selector should be select(100) from RG3"); + assert_eq!( + orig_selectors.len(), + 3, + "Original should have 3 selectors after merging" + ); + assert!( + !orig_selectors[0].skip && orig_selectors[0].row_count == 125, + "Original: First selector should be select(125) from RG0(100) + RG2(25)" + ); + assert!( + orig_selectors[1].skip && orig_selectors[1].row_count == 75, + "Original: Second selector should be skip(75) from RG2" + ); + assert!( + !orig_selectors[2].skip && orig_selectors[2].row_count == 100, + "Original: Third selector should be select(100) from RG3" + ); let original_selected: usize = orig_selectors .iter() @@ -810,27 +799,34 @@ mod tests { // - RG2's skip(75) remains as skip(75) // - RG0's select(100) remains as select(100) - assert_eq!(rev_selectors.len(), 3, "Reversed should have 3 selectors after merging"); + assert_eq!( + rev_selectors.len(), + 3, + "Reversed should have 3 selectors after merging" + ); // First selector: RG3 (100) + RG2 first part (25) merged into select(125) assert!( !rev_selectors[0].skip && rev_selectors[0].row_count == 125, "Reversed: First selector should be select(125) from RG3(100) + RG2(25), got skip={} count={}", - rev_selectors[0].skip, rev_selectors[0].row_count + rev_selectors[0].skip, + rev_selectors[0].row_count ); // Second selector: RG2 skip last 75 rows assert!( rev_selectors[1].skip && rev_selectors[1].row_count == 75, "Reversed: Second selector should be skip(75) from RG2, got skip={} count={}", - rev_selectors[1].skip, rev_selectors[1].row_count + rev_selectors[1].skip, + rev_selectors[1].row_count ); // Third selector: RG0 select all 100 rows assert!( !rev_selectors[2].skip && rev_selectors[2].row_count == 100, "Reversed: Third selector should be select(100) from RG0, got skip={} count={}", - rev_selectors[2].skip, rev_selectors[2].row_count + rev_selectors[2].skip, + rev_selectors[2].row_count ); // Verify row selection is also correctly reversed (total count) @@ -864,21 +860,14 @@ mod tests { ); access_plan.scan_selection( 1, - RowSelection::from(vec![ - RowSelector::skip(50), - RowSelector::select(50), - ]), - ); - access_plan.scan_selection( - 2, - RowSelection::from(vec![ - RowSelector::select(100), - ]), + RowSelection::from(vec![RowSelector::skip(50), RowSelector::select(50)]), ); + access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)])); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original selection structure in detail let orig_selectors: Vec<_> = prepared_plan @@ -950,22 +939,17 @@ mod tests { access_plan.scan_selection( 0, - RowSelection::from(vec![ - RowSelector::select(30), - RowSelector::skip(70), - ]), + RowSelection::from(vec![RowSelector::select(30), RowSelector::skip(70)]), ); access_plan.scan_selection( 2, - RowSelection::from(vec![ - RowSelector::skip(20), - RowSelector::select(80), - ]), + RowSelection::from(vec![RowSelector::skip(20), RowSelector::select(80)]), ); let rg_metadata = metadata.row_groups(); - let prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Original: [0, 2] assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]); From 0995132ef7a0e0f0a940378d7c3e8175d54da029 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 30 Dec 2025 23:11:48 +0800 Subject: [PATCH 10/10] Address new comments --- datafusion/datasource-parquet/src/sort.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index 8845031556109..abc50eeb317d5 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -176,6 +176,9 @@ mod tests { // Verify original plan assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]); + // No row selection originally due to scanning all rows + assert_eq!(prepared_plan.row_selection, None); + let reversed_plan = prepared_plan .reverse(&metadata) .expect("Failed to reverse PreparedAccessPlan"); @@ -183,15 +186,9 @@ mod tests { // Verify row groups are reversed assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]); - // If no selection originally, after reversal should still select all rows - if let Some(selection) = reversed_plan.row_selection { - let total_selected: usize = selection - .iter() - .filter(|s| !s.skip) - .map(|s| s.row_count) - .sum(); - assert_eq!(total_selected, 300); - } + // If no selection originally, after reversal should still select all rows, + // and the selection should be None + assert_eq!(reversed_plan.row_selection, None); } #[test]