feat: ADBC-backed Spark DataSource for DataFusion table providers#111
Draft
timsaucer wants to merge 5 commits into
Draft
feat: ADBC-backed Spark DataSource for DataFusion table providers#111timsaucer wants to merge 5 commits into
timsaucer wants to merge 5 commits into
Conversation
97dbf85 to
203bc9d
Compare
203bc9d to
763d725
Compare
Member
Author
End-to-end run resultsBuilt the example DataFusion ADBC driver and ran the full gated E2E locally (arrow-adbc 0.24-SNAPSHOT via
19/19 passing, 0 failures. Stack exercised: Spark DataSourceV2 → arrow-adbc JNI driver manager → DataFusion ADBC cdylib → custom provider. |
01b3834 to
0c21731
Compare
Add a Spark DataSourceV2 connector that reads from a DataFusion TableProvider through a standard ADBC driver. Spark talks to the upstream arrow-adbc Java driver manager (adbc-core + adbc-driver-jni), which loads a native DataFusion ADBC cdylib and returns arrow-java ArrowReaders consumed zero-copy as ArrowColumnVectors on the cluster-provided Arrow. Spark connector (spark/): - AdbcDatafusionTableProvider: registers the `adbc-datafusion` format; schema probed once on the driver via AdbcConnection.getTableSchema. - Scan with projection/filter/limit pushed into the scan as a Substrait plan, with a SQL fallback path. - Multi-partition reads via executePartitioned/readPartition; the target_partitions option tunes scan parallelism. - Per-executor AdbcConnectionPool caches the AdbcDatabase per driver + options key; each task opens its own connection off it (connections are not shared across tasks: the arrow-adbc FFI exporter aliases &mut to one connection, which would be undefined behavior). Because the database is cached per executor and the driver's plan cache is database-scoped, the N per-task connections of one scan deserialize the physical plan once, not once per task. AdbcSourceTest asserts this via the driver's plan-deserialize counter. Example driver (examples/adbc-datafusion-driver/): a DataFusion ADBC cdylib exercising the full stack, with a PySpark end-to-end script and partitioning tests. Pinned to a driver rev for reproducible builds. Docs: user-guide page "DataFusion as a Spark data source (ADBC)" in the documentation site, covering what the connector provides, how to read from Spark, and how DataFusion's parallelism-bound partitioning maps onto Spark's byte-bound task model. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
0c21731 to
60a824b
Compare
The connector's SchemaConverter mapped only a handful of Arrow types and failed load() on anything else, since inferSchema probes the full table schema up front. Extend it to cover every type Spark's ArrowColumnVector can read, and for the types it cannot read directly, push a cast into the scan so executors emit Spark-native Arrow (zero-copy import stays intact). - SchemaConverter: full recursive Arrow -> Spark map. Directly representable types (binary, nested list/struct/map, date, decimal, us timestamp, null) pass through; cast-required types (unsigned ints, Float16, non-us timestamps, time) map to their widened Spark type and expose an arrow_cast target string. u64 -> Decimal(20,0) to stay lossless past i64::MAX. - SqlQuery: wrap cast columns in arrow_cast(col, '<type>') AS col. - AdbcScanImpl: force the SQL wire when the schema needs a cast (Substrait cannot encode unsigned/Float16); partitioning is unaffected (ADBC executePartitioned works on the physical plan, and the cast is a partition-preserving projection). - AdbcScanBuilder: keep filter pushdown off cast columns (a pushed predicate runs against the pre-cast source domain). Tests: SchemaConverterTest and SqlQueryTest cover the type map and the arrow_cast string generation. The example driver gains a second `types` table spanning these Arrow types with known values (built as self-contained single-row batches so sliced-array offsets do not corrupt variable-width columns across the C-data boundary), and AdbcSourceTest asserts the schema, per-value round-trip through the casts, and multi-partition execution. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
count() prunes the pushed projection to empty, which rendered a bare SELECT * -- returning the raw, uncast schema. A non-Spark-native column (e.g. Timestamp(NANOSECOND), unsigned) then reached the reader and failed with UNSUPPORTED_ARROWTYPE. Only the empty-projection case leaked; the all-columns (null projection) path already injected casts. When the projection prunes to empty and the table has any cast column, emit a single readable probe column instead of SELECT * (SchemaConverter.probeColumn: prefer a castless column, else the first column with its cast applied). A count/column-less scan only needs the row count, and the emitted stream stays Spark-native. All-native tables keep plain SELECT *. Tests: SchemaConverterTest.probeColumn* (unit) and AdbcSourceTest.typesCountWorks* (E2E, reproduces the failure); the example pyspark harness gains a types-table count() assertion. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Spark's ArrowColumnVector backs ArrayType only from a variable ListVector, never a FixedSizeListVector, so a fixed-size list reached the reader and failed with UNSUPPORTED_ARROWTYPE FixedSizeList. SchemaConverter mapped it to ArrayType (schema probe passed) but needsCast did not flag it, so no arrow_cast was emitted; renderArrowType would also have produced an identity FixedSizeList(N) target. - needsCast: flag FixedSizeList unconditionally (the fixed layout is unreadable regardless of element type). - renderArrowType: render FixedSizeList as a variable List(<elem>) so arrow_cast converts fixed->variable, with the element rendered cast-aware (e.g. FixedSizeList<Float16> -> List(Float32)). Tests: SchemaConverterTest.fixedSizeListCastsToVariableList (unit); the example `types` table gains a FixedSizeList<UInt16,2> `vec` column and AdbcSourceTest / the pyspark harness assert it round-trips to Array<Integer> with the element widened. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ne authority Enumerating "maps to a Spark type" and "readable by ArrowColumnVector" by hand let them drift: a type could map to a Spark DataType yet have no vectorized accessor, crashing at task time (FixedSizeList, and latent for LargeList / FixedSizeBinary / Date64). Replace the three hand-kept switches with a single authority. - sparkConsumable(ArrowType): the read gate, mirroring Spark 4.0's ArrowColumnVector.initAccessor accessor set (verified against the class), deliberately the narrower gate. - sparkTarget(Field): the nearest consumable field, recursing into children (identity when already consumable). One place normalizes unsigned->signed, Float16->Float32, non-us timestamp->us, Date64->Date32, Time->int, FixedSizeBinary->Binary, FixedSizeList/LargeList->List. toSparkType, needsCast, and castTargetString now all derive from sparkTarget, so the gates cannot disagree and adding a type is one case. toSparkSchema asserts at plan time that every target is consumable, so an unsupported type (e.g. Interval) fails in planning naming the column, instead of an opaque executor UNSUPPORTED_ARROWTYPE. Closes three latent bugs alongside FixedSizeList: LargeList, FixedSizeBinary, and Date64 previously mapped to a Spark type without a cast. Tests: SchemaConverterTest gains cases for LargeList/FixedSizeBinary/Date64, a "sparkTarget is always consumable" invariant, and a fail-fast case; the example `types` table gains labels (LargeList<Utf8>), digest (FixedSizeBinary), and day (Date64) columns, asserted end-to-end in AdbcSourceTest and the pyspark harness. The E2E session enables the Java 8 date API so DateType decodes without needing sun.util.calendar opened. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
Spark users want to read DataFusion
TableProviderdata as a native SparkDataSourceV2. Rather than a bespoke per-operation JNI surface, this places the native boundary at a standard ADBC driver: Spark talks to the upstream arrow-adbc Java driver manager (adbc-core+adbc-driver-jni), which loads a native DataFusion ADBC cdylib. This reuses the upstream ADBC bindings instead of reproducing them, and keeps batches zero-copy on the cluster-provided Arrow.What changes are included in this PR?
Spark connector (
spark/) — aDataSourceV2registered as theadbc-datafusionformat:AdbcDatafusionTableProvider: schema probed once on the driver viaAdbcConnection.getTableSchema.executePartitioned/readPartition;target_partitionsoption tunes scan parallelism.AdbcConnectionPoolcaches theAdbcDatabaseper driver+options key; per-task connection (level 1) is the safe default, connection sharing (level 2) is gated off by default.ArrowColumnVectoron the cluster-provided Arrow.Example driver (
examples/adbc-datafusion-driver/) — a DataFusion ADBC cdylib exercising the full stack, with a PySpark end-to-end script and partitioning tests.Docs — ADBC-to-Spark partitioning guidance and connector notes.
Are these changes tested?
Yes — 19/19 passing locally.
Unit (no native driver):
AdbcConnectionPoolTest(8) — per-executor cache sharing + concurrency.AdbcOptionsTest(4) — option decoding / cache key.SubstraitPlanTest(3) — projection/filter/limit pushdown encoding.End-to-end (
AdbcSourceTest, 4 tests) — verified against the example DataFusion ADBC cdylib built fromexamples/adbc-datafusion-driver, with theadbc-snapshotprofile (arrow-adbc 0.24-SNAPSHOT). Full stack: Spark DataSourceV2 → arrow-adbc JNI driver manager → DataFusion ADBC cdylib → custom provider:projectionPushdown,filterPushdown— Substrait pushdown through the driver.fullScanAcrossPartitions— the provider's 3 partitions surface as 3 Spark input partitions (multi-partitionexecutePartitioned/readPartition).providerRegisteredOncePerExecutor— acrosslocal[8]slots the pool builds exactly one native database (ContextInit runs once), each task opening its own connection.The test is gated on
-Dadbc.example.driver.path; absent the cdylib it is skipped, so CI stays green without the native artifact. The example driver also shipspyspark_e2e.pyandtests/partitions.rs.Are there any user-facing changes?
Yes — a new
adbc-datafusionSpark data source format and its options (driver,table,target_partitions, ...). Additive; no changes to existing public APIs. Documentation is included.