Skip to content

feat: ADBC-backed Spark DataSource for DataFusion table providers#111

Draft
timsaucer wants to merge 5 commits into
apache:mainfrom
timsaucer:feat/adbc-spark-connector
Draft

feat: ADBC-backed Spark DataSource for DataFusion table providers#111
timsaucer wants to merge 5 commits into
apache:mainfrom
timsaucer:feat/adbc-spark-connector

Conversation

@timsaucer

@timsaucer timsaucer commented Jun 30, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Rationale for this change

Spark users want to read DataFusion TableProvider data as a native Spark DataSourceV2. Rather than a bespoke per-operation JNI surface, this places the native boundary at a standard ADBC driver: Spark talks to the upstream arrow-adbc Java driver manager (adbc-core + adbc-driver-jni), which loads a native DataFusion ADBC cdylib. This reuses the upstream ADBC bindings instead of reproducing them, and keeps batches zero-copy on the cluster-provided Arrow.

What changes are included in this PR?

Spark connector (spark/) — a DataSourceV2 registered as the adbc-datafusion format:

  • AdbcDatafusionTableProvider: schema probed once on the driver via AdbcConnection.getTableSchema.
  • Projection / filter / limit pushed into the scan as a Substrait plan, with a SQL fallback path.
  • Multi-partition reads via executePartitioned / readPartition; target_partitions option tunes scan parallelism.
  • Per-executor AdbcConnectionPool caches the AdbcDatabase per driver+options key; per-task connection (level 1) is the safe default, connection sharing (level 2) is gated off by default.
  • Batches imported zero-copy as ArrowColumnVector on the cluster-provided Arrow.

Example driver (examples/adbc-datafusion-driver/) — a DataFusion ADBC cdylib exercising the full stack, with a PySpark end-to-end script and partitioning tests.

Docs — ADBC-to-Spark partitioning guidance and connector notes.

Are these changes tested?

Yes — 19/19 passing locally.

Unit (no native driver):

  • AdbcConnectionPoolTest (8) — per-executor cache sharing + concurrency.
  • AdbcOptionsTest (4) — option decoding / cache key.
  • SubstraitPlanTest (3) — projection/filter/limit pushdown encoding.

End-to-end (AdbcSourceTest, 4 tests) — verified against the example DataFusion ADBC cdylib built from examples/adbc-datafusion-driver, with the adbc-snapshot profile (arrow-adbc 0.24-SNAPSHOT). Full stack: Spark DataSourceV2 → arrow-adbc JNI driver manager → DataFusion ADBC cdylib → custom provider:

  • projectionPushdown, filterPushdown — Substrait pushdown through the driver.
  • fullScanAcrossPartitions — the provider's 3 partitions surface as 3 Spark input partitions (multi-partition executePartitioned/readPartition).
  • providerRegisteredOncePerExecutor — across local[8] slots the pool builds exactly one native database (ContextInit runs once), each task opening its own connection.

The test is gated on -Dadbc.example.driver.path; absent the cdylib it is skipped, so CI stays green without the native artifact. The example driver also ships pyspark_e2e.py and tests/partitions.rs.

Are there any user-facing changes?

Yes — a new adbc-datafusion Spark data source format and its options (driver, table, target_partitions, ...). Additive; no changes to existing public APIs. Documentation is included.

@timsaucer timsaucer force-pushed the feat/adbc-spark-connector branch from 97dbf85 to 203bc9d Compare June 30, 2026 20:28
@timsaucer timsaucer marked this pull request as draft June 30, 2026 20:36
@timsaucer timsaucer force-pushed the feat/adbc-spark-connector branch from 203bc9d to 763d725 Compare June 30, 2026 20:44
@timsaucer

Copy link
Copy Markdown
Member Author

End-to-end run results

Built the example DataFusion ADBC driver and ran the full gated E2E locally (arrow-adbc 0.24-SNAPSHOT via -Padbc-snapshot):

(cd examples/adbc-datafusion-driver && cargo build --release)
mvn -pl spark -am test -Dtest=AdbcSourceTest -Padbc-snapshot \
  -Dsurefire.failIfNoSpecifiedTests=false \
  -Dadbc.example.driver.path=$PWD/rust-target/release/libadbc_datafusion_example_driver.dylib
Suite Tests Result
AdbcConnectionPoolTest 8
AdbcOptionsTest 4
SubstraitPlanTest 3
AdbcSourceTest (E2E) 4

19/19 passing, 0 failures. fullScanAcrossPartitions confirmed the provider's 3 partitions surfacing as 3 Spark input partitions; providerRegisteredOncePerExecutor confirmed one native database across local[8] task slots.

Stack exercised: Spark DataSourceV2 → arrow-adbc JNI driver manager → DataFusion ADBC cdylib → custom provider.

@timsaucer timsaucer force-pushed the feat/adbc-spark-connector branch 6 times, most recently from 01b3834 to 0c21731 Compare July 1, 2026 11:25
Add a Spark DataSourceV2 connector that reads from a DataFusion
TableProvider through a standard ADBC driver. Spark talks to the
upstream arrow-adbc Java driver manager (adbc-core + adbc-driver-jni),
which loads a native DataFusion ADBC cdylib and returns arrow-java
ArrowReaders consumed zero-copy as ArrowColumnVectors on the
cluster-provided Arrow.

Spark connector (spark/):
- AdbcDatafusionTableProvider: registers the `adbc-datafusion` format;
  schema probed once on the driver via AdbcConnection.getTableSchema.
- Scan with projection/filter/limit pushed into the scan as a Substrait
  plan, with a SQL fallback path.
- Multi-partition reads via executePartitioned/readPartition; the
  target_partitions option tunes scan parallelism.
- Per-executor AdbcConnectionPool caches the AdbcDatabase per driver +
  options key; each task opens its own connection off it (connections
  are not shared across tasks: the arrow-adbc FFI exporter aliases &mut
  to one connection, which would be undefined behavior).

Because the database is cached per executor and the driver's plan cache
is database-scoped, the N per-task connections of one scan deserialize
the physical plan once, not once per task. AdbcSourceTest asserts this
via the driver's plan-deserialize counter.

Example driver (examples/adbc-datafusion-driver/): a DataFusion ADBC
cdylib exercising the full stack, with a PySpark end-to-end script and
partitioning tests. Pinned to a driver rev for reproducible builds.

Docs: user-guide page "DataFusion as a Spark data source (ADBC)" in the
documentation site, covering what the connector provides, how to read
from Spark, and how DataFusion's parallelism-bound partitioning maps
onto Spark's byte-bound task model.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@timsaucer timsaucer force-pushed the feat/adbc-spark-connector branch from 0c21731 to 60a824b Compare July 1, 2026 13:12
timsaucer and others added 4 commits July 1, 2026 15:26
The connector's SchemaConverter mapped only a handful of Arrow types and
failed load() on anything else, since inferSchema probes the full table
schema up front. Extend it to cover every type Spark's ArrowColumnVector
can read, and for the types it cannot read directly, push a cast into the
scan so executors emit Spark-native Arrow (zero-copy import stays intact).

- SchemaConverter: full recursive Arrow -> Spark map. Directly representable
  types (binary, nested list/struct/map, date, decimal, us timestamp, null)
  pass through; cast-required types (unsigned ints, Float16, non-us
  timestamps, time) map to their widened Spark type and expose an arrow_cast
  target string. u64 -> Decimal(20,0) to stay lossless past i64::MAX.
- SqlQuery: wrap cast columns in arrow_cast(col, '<type>') AS col.
- AdbcScanImpl: force the SQL wire when the schema needs a cast (Substrait
  cannot encode unsigned/Float16); partitioning is unaffected (ADBC
  executePartitioned works on the physical plan, and the cast is a
  partition-preserving projection).
- AdbcScanBuilder: keep filter pushdown off cast columns (a pushed predicate
  runs against the pre-cast source domain).

Tests: SchemaConverterTest and SqlQueryTest cover the type map and the
arrow_cast string generation. The example driver gains a second `types`
table spanning these Arrow types with known values (built as self-contained
single-row batches so sliced-array offsets do not corrupt variable-width
columns across the C-data boundary), and AdbcSourceTest asserts the schema,
per-value round-trip through the casts, and multi-partition execution.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
count() prunes the pushed projection to empty, which rendered a bare
SELECT * -- returning the raw, uncast schema. A non-Spark-native column
(e.g. Timestamp(NANOSECOND), unsigned) then reached the reader and failed
with UNSUPPORTED_ARROWTYPE. Only the empty-projection case leaked; the
all-columns (null projection) path already injected casts.

When the projection prunes to empty and the table has any cast column,
emit a single readable probe column instead of SELECT *
(SchemaConverter.probeColumn: prefer a castless column, else the first
column with its cast applied). A count/column-less scan only needs the row
count, and the emitted stream stays Spark-native. All-native tables keep
plain SELECT *.

Tests: SchemaConverterTest.probeColumn* (unit) and
AdbcSourceTest.typesCountWorks* (E2E, reproduces the failure); the example
pyspark harness gains a types-table count() assertion.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Spark's ArrowColumnVector backs ArrayType only from a variable ListVector,
never a FixedSizeListVector, so a fixed-size list reached the reader and
failed with UNSUPPORTED_ARROWTYPE FixedSizeList. SchemaConverter mapped it
to ArrayType (schema probe passed) but needsCast did not flag it, so no
arrow_cast was emitted; renderArrowType would also have produced an
identity FixedSizeList(N) target.

- needsCast: flag FixedSizeList unconditionally (the fixed layout is
  unreadable regardless of element type).
- renderArrowType: render FixedSizeList as a variable List(<elem>) so
  arrow_cast converts fixed->variable, with the element rendered cast-aware
  (e.g. FixedSizeList<Float16> -> List(Float32)).

Tests: SchemaConverterTest.fixedSizeListCastsToVariableList (unit); the
example `types` table gains a FixedSizeList<UInt16,2> `vec` column and
AdbcSourceTest / the pyspark harness assert it round-trips to Array<Integer>
with the element widened.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ne authority

Enumerating "maps to a Spark type" and "readable by ArrowColumnVector"
by hand let them drift: a type could map to a Spark DataType yet have no
vectorized accessor, crashing at task time (FixedSizeList, and latent for
LargeList / FixedSizeBinary / Date64). Replace the three hand-kept switches
with a single authority.

- sparkConsumable(ArrowType): the read gate, mirroring Spark 4.0's
  ArrowColumnVector.initAccessor accessor set (verified against the class),
  deliberately the narrower gate.
- sparkTarget(Field): the nearest consumable field, recursing into children
  (identity when already consumable). One place normalizes unsigned->signed,
  Float16->Float32, non-us timestamp->us, Date64->Date32, Time->int,
  FixedSizeBinary->Binary, FixedSizeList/LargeList->List.

toSparkType, needsCast, and castTargetString now all derive from
sparkTarget, so the gates cannot disagree and adding a type is one case.
toSparkSchema asserts at plan time that every target is consumable, so an
unsupported type (e.g. Interval) fails in planning naming the column,
instead of an opaque executor UNSUPPORTED_ARROWTYPE.

Closes three latent bugs alongside FixedSizeList: LargeList, FixedSizeBinary,
and Date64 previously mapped to a Spark type without a cast.

Tests: SchemaConverterTest gains cases for LargeList/FixedSizeBinary/Date64,
a "sparkTarget is always consumable" invariant, and a fail-fast case; the
example `types` table gains labels (LargeList<Utf8>), digest
(FixedSizeBinary), and day (Date64) columns, asserted end-to-end in
AdbcSourceTest and the pyspark harness. The E2E session enables the Java 8
date API so DateType decodes without needing sun.util.calendar opened.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark DataSource backed by a DataFusion TableProvider over ADBC

1 participant