Skip to content

Support distributed processing by pickling expressions#1543

Draft
timsaucer wants to merge 57 commits into
apache:mainfrom
timsaucer:feat/expr-pickle
Draft

Support distributed processing by pickling expressions#1543
timsaucer wants to merge 57 commits into
apache:mainfrom
timsaucer:feat/expr-pickle

Conversation

@timsaucer
Copy link
Copy Markdown
Member

@timsaucer timsaucer commented May 15, 2026

Status

I am going to leave this in draft so reviewers can see the entire solution at once but break into parts for more effective review.

Which issue does this PR close?

Closes #1517

Rationale for this change

One model for distributing datafusion-python work is by breaking apart a query at the expression level and getting results for individual portions in different workers. This currently does not work because we cannot pickle PyExpr. This PR adds support for that via the updated Python codecs that landed in #1541.

What changes are included in this PR?

  • Updates Scalar, Aggregate, and Window functions to support pickling requirements
  • Adds Codec work for doing the actual pickling
  • Adds unit tests
  • Updates documentation site to describe modes of distributed work
  • Adds support to disable pickling for untrusted input
  • Reorders Expr.from_bytes arguments from (ctx, data) to (buf, ctx=None) and converts it to a classmethod so ctx is optional and the worker / global context is consulted by default — needed for pickle.loads, which has no place to thread an explicit context

Are there any user-facing changes?

This should be pure addition from the user's point of view. Expr.from_bytes did change argument order, but the previous signature (from_bytes(ctx, data)) landed in #1541 and has not appeared in a released wheel, so no published API is affected.

timsaucer and others added 18 commits May 15, 2026 09:03
Builds on the codec consistency work in feat/proto-codecs. Python
scalar UDFs are cloudpickled inline into the proto `fun_definition`
field by PythonLogicalCodec / PythonPhysicalCodec, so a pickled Expr
that references a Python `udf()` reconstructs on the receiver with
no pre-registration. UDAFs, UDWFs, and FFI-imported UDFs still
resolve through the receiver's session.

Rust:
* `PythonFunctionScalarUDF` regains the `func()` / `input_fields()` /
  `return_field()` / `volatility()` / `from_parts()` accessors the
  codec needs.
* `crates/core/src/codec.rs` adds shared
  `try_encode_python_scalar_udf` / `try_decode_python_scalar_udf`
  helpers built on cloudpickle + pyarrow IPC for the input schema.
  Both `PythonLogicalCodec.try_encode_udf` and
  `PythonPhysicalCodec.try_encode_udf` consult the helper first and
  fall back to `inner` for non-Python UDFs (and the receiver's
  function registry on decode if the prefix does not match).

Python:
* `datafusion.ipc` module: thread-local `set_worker_ctx` /
  `clear_worker_ctx` / `get_worker_ctx` for installing a receiver
  `SessionContext` on a worker process. `_resolve_ctx` returns
  explicit > worker > fresh.
* `Expr.__reduce__` returns `(Expr._reconstruct, (self.to_bytes(),))`.
  `_reconstruct` calls `Expr.from_bytes(buf, ctx=None)` which
  consults the worker context.
* `Expr.from_bytes` signature switches to `(buf, ctx=None)` (was
  `(ctx, buf)`); no callers in main, only PR1 tests which are
  updated.
* `datafusion.ipc` exported from the top-level package.

Dependencies:
* `cloudpickle>=2.0` added as a runtime dep. Lazy-imported on the
  encode / decode hot paths — users who never pickle a plan or
  expression pay only the install footprint, not import-time cost.
* ruff `S301` added to the test-suite + examples ignore lists
  (legitimate `pickle.loads` use).

Tests:
* `test_pickle_expr.py` — 11 cases covering built-in expr pickle,
  scalar UDF self-contained blobs, closure-capturing UDFs, worker
  ctx lifecycle, thread-local isolation.
* `test_pickle_multiprocessing.py` + `_pickle_multiprocessing_helpers.py`
  — parametrized over `fork`/`forkserver`/`spawn` start methods. 9
  cases. Auto-skip when the sandbox blocks semaphore creation; CI
  runs the full matrix.
* `test_expr.py` — existing `from_bytes` tests updated to new
  signature.

1088 root tests pass (up from 1077), 13 skipped (up from 4, the new
mp cases skip locally under sandboxed semaphores).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Companion to the pickle work in the previous commit. Ships the
discoverable surface a user would actually reach for when they hit
"how do I distribute these expressions":

* `docs/source/user-guide/io/distributing_expressions.rst` — end-to-end
  user guide covering the recommended `Pool(initializer=...)` pattern,
  the worker context shape, what does and does not survive the
  round-trip (scalar UDFs yes, UDAF/UDWF/FFI by name), Python 3.14
  start-method change, and the cloudpickle security note.
* `examples/ray_pickle_expr.py` — runnable Ray actor demo using
  `set_worker_ctx` from an actor `__init__`.
* `examples/README.md` — links to the Ray example.
* `docs/source/user-guide/io/index.rst` — adds the new page to the
  IO TOC.
* `.github/workflows/test.yml` — 30-minute `timeout-minutes` backstop
  on the test matrix so a hung multiprocessing worker (e.g. during a
  pickle regression) does not block CI indefinitely.
* `python/datafusion/user_defined.py` — `ScalarUDF` / `AggregateUDF` /
  `WindowUDF` get a `.name` property surfacing the registered name.
  Useful for tests asserting an expression carries a specific UDF
  reference, and for users debugging worker registrations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`input_fields: Vec<Field>` and `volatility: Volatility` were added to
the struct so the codec could read them on encode. Both were
redundant:

* `Signature` already carries the `Vec<DataType>` (via
  `TypeSignature::Exact`) and `Volatility` — the constructor collapses
  the incoming `Vec<Field>` to `DataType`s on its way into the
  signature, so `Field`-level metadata (nullability, attached
  metadata) is never propagated anywhere on the local side.
* On decode, `from_parts` runs that same collapse again. Sender's
  `Signature` and receiver's `Signature` end up with the same
  `DataType`s and the same `Volatility`. The reconstructed
  `PythonFunctionScalarUDF` is functionally equivalent to the
  original without preserving the input-side `Field`s.

Revert the struct to its original 4-field shape (`name`, `func`,
`signature`, `return_field`). The codec now derives the input
`DataType`s from `signature.type_signature` and reads volatility from
`signature.volatility`. Input fields are still serialized into the
cloudpickle payload (with synthesized `arg_i` names) so the wire
format is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous round-trip went Rust Schema -> pyarrow Schema -> IPC bytes
-> cloudpickle tuple -> pyarrow Schema -> Rust Schema, for both the
input schema and the return Field. Two unnecessary pyarrow trips on
each side.

Replace with `StreamWriter::try_new(&mut buf, &schema)?.finish()?`
on the encoder and `StreamReader::try_new(cursor, None)?.schema()`
on the decoder. Both ends produce / consume the same Arrow IPC
stream bytes — arrow-rs writes a schema-only stream, arrow-rs reads
it back, no PyArrow involvement.

Tuple shape changes slightly: the fourth field is now a one-field
`return_schema_bytes` IPC blob instead of a pickled pyarrow `Field`.
Keeps everything in `Vec<u8>` form before cloudpickle picks it up.

`pyarrow.ipc.read_schema` and the `ToPyArrow` / `FromPyArrow` traits
on `Schema` / `Field` are no longer needed on the codec hot path,
shaving a noticeable chunk of pyarrow function dispatch from each
encode / decode call.

Pickle tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous wording for `Expr.to_bytes`, `Expr.__reduce__`, and the
`datafusion.ipc` module header referenced ``PythonLogicalCodec`` and
``cloudpickle`` to explain what survives the wire. Neither name is
importable from Python and the mechanism is irrelevant to the end
user — only the resulting contract matters.

Reword each docstring to describe the user-facing guarantee directly:

* Python scalar UDFs travel inside the pickle / serialized blob, no
  pre-registration needed on the receiver.
* Aggregate UDFs, window UDFs, and FFI-capsule UDFs travel by name
  only and require the receiver to have them registered (typically
  via `set_worker_ctx`).

The implementation can change underneath without invalidating these
docs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User-facing docs throughout this PR led with "pickle support":
filename-shaped headings, function docstrings describing how things
get cloudpickled into a Rust-side codec, etc. That's the
implementation pathway, not the user's goal.

The user's goal is to build an expression in a driver process and
ship it to worker processes for distributed evaluation. Pickle is the
mechanism Python provides to make that work; we hook into it. End
users typically don't care how the bytes are produced — they care
which references survive the trip and what they have to register on
each worker.

Reframe across user-facing surfaces:

* `docs/source/user-guide/io/distributing_expressions.rst` — leads
  with the worker-pool use case, drops `PythonUDFCodec` /
  cloudpickle vocabulary, presents "what travels with the
  expression" as the user contract.
* `datafusion.ipc` module docstring + `set_worker_ctx` /
  `clear_worker_ctx` / `get_worker_ctx` — describes what the user
  installs and why, not internal lookup details.
* `Expr.to_bytes` / `from_bytes` / `__reduce__` — describes what's
  shipped vs what travels by name; cross-references the user guide
  instead of repeating the codec story.
* `examples/ray_pickle_expr.py` header + comment + README entry —
  goal-first wording.
* Pickle test module docstrings — drop the dangling reference to
  `PythonUDFCodec` (also a stale name post-PR1).

Code behavior unchanged. 1088 tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Window UDFs no longer need worker-side pre-registration. The codec
serializes the Python evaluator factory into the wire format and the
receiver reconstructs the UDF from bytes alone, same as scalar UDFs.

Refactor `MultiColumnWindowUDF` to store the Python evaluator
callable directly (`evaluator: Py<PyAny>`) instead of a
`PartitionEvaluatorFactory` closure. The factory closure was a
boxed `Fn` that captured the Python state opaquely, with nothing for
the codec to downcast back to. Now the named struct holds the
`Py<PyAny>` and builds a partition evaluator inside
`partition_evaluator()` on demand.

`PyWindowUDF::new` constructs `MultiColumnWindowUDF` directly with
the evaluator. `to_rust_partition_evaluator` is replaced by
`instantiate_partition_evaluator`, called from the trait method.

Codec wiring:
* `crates/core/src/codec.rs` adds `try_encode_python_window_udf` /
  `try_decode_python_window_udf` plus the `DFPYUDW1` magic prefix.
* `PythonLogicalCodec.try_encode_udwf` / `try_decode_udwf` and the
  matching `PythonPhysicalCodec` methods consult the helpers first
  and fall back to `inner` for non-Python window UDFs.

Test coverage in `test_pickle_expr.py::TestWindowUDFCodec` mirrors
the scalar UDF cases: self-contained blob, decode into fresh
context, decode via pickle with no worker context.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Aggregate UDFs no longer need worker-side pre-registration. The codec
serializes the Python accumulator factory + state schema into the
wire format and the receiver reconstructs the UDF from bytes alone.

New `PythonFunctionAggregateUDF` named struct (in `crates/core/src/udaf.rs`)
holds `accumulator: Py<PyAny>` plus signature, return type, and state
fields directly. Full `AggregateUDFImpl` impl mirroring upstream
`SimpleAggregateUDF`: `as_any`, `name`, `signature`, `return_type`,
`accumulator`, `state_fields`. `accumulator()` lazily instantiates a
fresh accumulator per partition via the new
`instantiate_accumulator()` helper.

`PyAggregateUDF::new` now constructs `PythonFunctionAggregateUDF`
directly via `AggregateUDF::new_from_impl(...)` instead of routing
through `create_udaf(...)` + `to_rust_accumulator(...)`. The closure-
based factory path is gone; the Python state stays addressable.

Codec wiring:
* `crates/core/src/codec.rs` adds `try_encode_python_agg_udf` /
  `try_decode_python_agg_udf` plus the `DFPYUDA1` magic prefix.
  Tuple shape: `(name, accumulator, input_schema_bytes,
  return_schema_bytes, state_schema_bytes, volatility_str)`.
* `PythonLogicalCodec.try_encode_udaf` / `try_decode_udaf` and the
  matching `PythonPhysicalCodec` methods consult the helpers first
  and fall back to `inner` for non-Python aggregate UDFs.

Test coverage in `test_pickle_expr.py::TestAggregateUDFCodec` mirrors
the scalar / window UDF cases.

1094 root tests pass (up from 1088, plus 3 new UDAF cases and 3 new
UDWF cases from the prior commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With aggregate UDFs and window UDFs now reconstructable from bytes
alone, the user-facing contract simplifies to:

* Built-in functions and **all** Python UDFs (scalar, aggregate,
  window) travel inside the shipped expression. No worker-side
  pre-registration.
* Only UDFs imported via the FFI capsule protocol travel by name and
  require pre-registration via `set_worker_ctx`.

Update each user-facing surface:

* `docs/source/user-guide/io/distributing_expressions.rst` — drop the
  "aggregate/window UDFs travel by name only" caveat; rename the
  practical-considerations entry that called out the limitation.
* `python/datafusion/ipc.py` module + `clear_worker_ctx` — explicitly
  list scalar, aggregate, and window as inline-portable.
* `python/datafusion/expr.py` — `to_bytes` and `__reduce__`
  docstrings updated.
* Test module docstrings updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…inds

Two fixes in the intro paragraph:

* Link to the standard library pickle docs rather than relying on the
  reader's familiarity with `pickle.dumps` / `pickle.loads`.
* "Python scalar UDFs ride along" only covered scalar UDFs. With
  aggregate and window UDFs now also traveling inline, the line is
  reworded to call out all three kinds.

Also updates the inline code comment in the worker-pool example.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Pool / Ray-actor examples called `pickle.dumps` on the sender
and `pickle.loads` on the worker explicitly. That's not what real
user code looks like — `multiprocessing.Pool.starmap`, Ray's
`@ray.remote`, and similar frameworks serialize their function
arguments automatically. Showing the manual wrapping makes the API
look more involved than it is and obscures the point: users hand a
DataFusion `Expr` to their distribution framework like any other
Python object, and it Just Works.

Rewrites:

* User guide worker-pool example switches from
  `pool.map(evaluate, [(blob, batch), ...])` (where `blob =
  pickle.dumps(expr)`) to `pool.starmap(evaluate, [(expr, batch),
  ...])`. `evaluate(expr, batch)` receives the reconstructed
  expression directly.
* Ray example drops the `pickle.dumps(expr)` / `pickle.loads(blob)`
  pair; `evaluate(expr, batch)` takes a typed `Expr`. Drops the
  unused `pickle` import.
* Worker-context narrative updated: "expressions reconstructed by
  pickle.loads" -> "expressions arriving from the driver".
* Security warning reworded to mention pickle as the underlying
  mechanism while still framing the contract in user terms (only
  accept expressions from trusted sources).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The page previously framed itself entirely around expression-level
distribution. With datafusion-distributed and datafusion-ballista
integrations in progress upstream, an overview of *all* distribution
approaches is more durable: it establishes the page as the
distribution landing spot, sets reader expectations about what is
ready today versus what is on the way, and lets the future
integrations slot in without renaming or restructuring.

Rename `distributing_expressions.rst` → `distributing_work.rst` and
rewrite as:

* Overview lead — three approaches (expression-level, query-level via
  datafusion-distributed, query-level via Ballista) with status
  markers.
* "Expression-level distribution" — the existing content, slotted in
  as a sub-section.
* "Query-level distribution via datafusion-distributed" — placeholder
  noting the upstream WIP and that the integration will be documented
  here once usable.
* "Query-level distribution via Apache Ballista" — same.

Cross-references in `datafusion.ipc` and `Expr.to_bytes` /
`__reduce__` docstrings updated to the new doc name. TOC entry in
io/index.rst updated. Filename and URL stable from here on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`pickle-redesign-plan.md` is an off-tree working doc for PR1/PR2
sequencing decisions, not user-facing documentation. It was added by
a stray `git add -A` in the previous commit. Untrack it; the file
stays in the working tree as untracked for local reference.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The "multi-column" name was a relic of an earlier upstream limitation
where `SimpleWindowUDF` only accepted a single input column. With the
struct now also storing the Python evaluator factory directly for
pickle support, the relevant distinction is no longer column count
but "Python-defined". Rename to match `PythonFunctionScalarUDF` and
`PythonFunctionAggregateUDF` for a consistent naming convention
across all three Python UDF kinds.

Also tighten visibility from `pub` to `pub(crate)`. No external
consumer; the struct only needs to be reachable from `PyWindowUDF`
and the codec.

No functional change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…te style

Other code blocks in the user guide present snippets inline at module
level; the worker-pool example was the only one using
``if __name__ == "__main__":``. Restructure as two blocks (worker
function + driver code), both inline, with a prose note explaining
when the guard is actually needed (saving to a .py file and running
under ``spawn`` / ``forkserver``). Matches the look of the surrounding
docs and keeps the snippet copy-pasteable for the interactive case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds `SessionContext.with_python_udf_inlining(enabled)` for two
related use cases:

* **Cross-language portability.** With inlining disabled, the codec
  no longer emits `DFPYUDF1` / `DFPYUDA1` / `DFPYUDW1` cloudpickle
  blobs. Python UDFs travel by name only, the same way FFI-capsule
  UDFs do. Bytes round-trip through a non-Python decoder.
* **Untrusted-source decode.** `Expr.from_bytes` on bytes from a
  misbehaving sender no longer invokes `cloudpickle.loads`. Inline
  payloads received by a strict decoder raise a clear error.

`PythonLogicalCodec` and `PythonPhysicalCodec` gain a
`python_udf_inlining: bool` field (default `true`) and a builder
method `with_python_udf_inlining(enabled)`. The six UDF
encode/decode dispatchers consult the flag before calling the inline
helpers. Strict decoders that see a magic-prefix payload return a
clear `Plan` error rather than silently failing through to the inner
codec (which would otherwise produce "LogicalExtensionCodec is not
provided" — accurate but unhelpful).

`PySessionContext::with_python_udf_inlining(enabled)` rebuilds both
codecs with the new setting; Python wrapper at
`SessionContext.with_python_udf_inlining` mirrors. Test coverage:
encoder size delta, strict roundtrip via registry,
clear-error-on-inline-payload-when-strict.

`pickle.loads` on untrusted bytes remains unsafe regardless of this
flag; the toggle only governs the `to_bytes` / `from_bytes` codec
path. User guide documents both use cases plus the limitation.

1097 root tests pass (up from 1094 with 3 new strict-mode cases).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
to Python wrapper

Two changes:

* Reference link to the pickle module's official security warning
  in `https://docs.python.org/3/library/pickle.html#module-pickle`.
  Added in the user guide ("Disabling Python UDF inlining" note and
  the Security warning block) and in the Python
  `SessionContext.with_python_udf_inlining` docstring. The
  unqualified phrase "pickle is unsafe on untrusted input" assumed
  reader background that not every datafusion-python user has.

* Strip the user-facing prose docstring from the Rust
  `PySessionContext::with_python_udf_inlining` method. Python
  wrappers are what users see via `help()` and Sphinx; the Rust
  doc-comment duplicated the same text and risked drifting from the
  Python version. Matches the surrounding methods
  (`with_logical_extension_codec`, `with_physical_extension_codec`)
  which carry no Rust doc-comment for the same reason.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The actor was calling `ctx.register_udf(...)` and `set_worker_ctx(ctx)`
to make the inbound expression's UDF resolvable on the worker. With
Python scalar/aggregate/window UDFs now traveling inside the
serialized expression, neither call is necessary — the actor just
needs a `SessionContext` to evaluate against.

Also drops the parallel `sender.register_udf(...)` in the driver; an
expression built with a `udf(...)` callable carries its own reference
and does not require the UDF to be registered on the driver session.

Result: each actor is a few lines (one `SessionContext`, one
`evaluate` method) — what the inline-UDF story is actually trying
to demonstrate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer
Copy link
Copy Markdown
Member Author

FYI @ntjohnson1

timsaucer and others added 11 commits May 15, 2026 10:06
`SessionContext.with_python_udf_inlining(False)` previously had no
effect on `pickle.dumps(expr)` because `__reduce__` called
`to_bytes()` without a context. Add a thread-local sender slot
(`datafusion.ipc.set_sender_ctx`) that `__reduce__` consults, so the
inlining toggle flows through pickle. Symmetric to the existing
worker slot.

Also switch the worker-side decode fallback from a freshly
constructed `SessionContext` to `SessionContext.global_ctx()` so
expressions arriving with no explicit/worker context can still
resolve names registered on the global singleton.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`>>> # doctest: +SKIP` on its own line is not valid — doctest
directives must attach to an example line. CI runs `pytest
--doctest-modules` and `pyproject.toml` includes `python/datafusion`
in testpaths, so the bad directive aborted collection across every
Python version.

Replace the two illustrative `>>>` blocks with `.. code-block::
python` since they were never meant to execute under doctest.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Ray example fans the same expression out to every actor — it
demonstrates pickling works but doesn't show why expression
pickling is necessary (a by-name registration on each worker would
suffice for that pattern).

This example builds a list of parametric expressions in the driver,
each closing over a different threshold value, and ships one per
worker via `multiprocessing.Pool`. The closure state forces the
cloudpickle path: a by-name registration on the worker would
collapse every threshold into the same callable and lose the
per-task value. Workers return results plus their PID so the driver
output makes the cross-process distribution visible.

Also mixes a scalar UDF in an aggregate and a pure UDAF so both
kinds round-trip through pickle. README and the distributing-work
"See also" section now link the example.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`encode_python_agg_udf` wrote the full state schema (names,
nullability, key/value metadata) to its IPC payload, but
`decode_python_agg_udf` collapsed it to `Vec<DataType>` and
`PythonFunctionAggregateUDF::from_parts` re-synthesized field names
as `"{i}"`. An aggregate UDF arriving via FFI capsule with named
state fields would lose those names on round-trip; a downstream
`StateFieldsArgs` consumer keyed off `field.name()` or
`field.nullable()` would see different output before vs. after
serialization.

Decode the IPC schema into `Vec<FieldRef>` and feed it directly to
`from_parts`, which now takes fields instead of types. The Python
construction path still synthesizes names through `new` — that
codepath only receives `Vec<DataType>` from PyArrow, so there is
nothing better to preserve.

Add an end-to-end pickle round-trip test that runs the decoded
UDAF and merges across partitions, exercising the rebuilt state
schema.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`format!("{:?}", signature.volatility).to_lowercase()` coupled the
Python UDF wire format to upstream `Volatility`'s `Debug` repr. If
DataFusion ever changes that repr (renames a variant, adds a field,
adopts non-lowercase casing), encoded bytes would silently fail to
decode — `parse_volatility` only accepts "immutable" / "stable" /
"volatile".

Replace all three sites (scalar / window / aggregate encoders) with
a `volatility_wire_str(Volatility) -> &'static str` helper that
exhaustively matches the three variants. The exhaustive match also
turns a future upstream `Volatility` variant addition into a
compile error instead of a runtime decode failure.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`Python::attach(|py| self.func.bind(py).hash().unwrap_or(0))`
silently collapsed every unhashable closure (e.g. a lambda with an
unhashable captured value) to the same hash bucket — the worst case
for hashmap lookups, and arrived at via `unwrap_or` rather than a
deliberate design choice.

Hash on the identifying header (name + signature + return field /
state fields) only. The Rust `Hash` contract requires
`a == b ⇒ hash(a) == hash(b)`, not the converse, so a coarser hash
is still sound — `PartialEq` (which still calls Python `eq` on the
callable) disambiguates two UDFs that share a header but differ in
their callable. Applied symmetrically to scalar, aggregate, and
window UDFs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The six encode/decode helpers re-resolved `cloudpickle` on every
call. `py.import` is backed by `sys.modules` so each lookup is
cheap, but a plan with many Python UDFs pays the dict-walk + bind
cost per UDF for no reason. Cache the module once in a
`OnceLock<Py<PyAny>>` and hand out a fresh `Bound` against the
caller's GIL token.

Race-safe: two threads can both miss and import, but CPython
already deduplicates the import via `sys.modules` and the losing
`set` still leaves the winning value in the slot, so both threads
return the same module.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…af/udwf

`PythonFunctionScalarUDF::from_parts` previously took `Vec<Field>`
for inputs only to call `.data_type()` on every entry inside `new` —
nullability and metadata went straight in the bin because
`Signature::exact` cannot represent them. The codec carried full
Field info through IPC but threw it away one frame later.

Take `Vec<DataType>` instead. The codec extracts data types from the
decoded IPC schema and feeds them in directly. `return_field` is
still a `Field` so per-output nullability/metadata round-trips
intact, the same way `state_fields` is preserved on the aggregate
side. The Python constructor path (`PyScalarUDF::new`) still goes
through `new` with `Vec<Field>` since PyArrow already hands it the
full Field list.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The dependency comment said "scalar UDF callables" but cloudpickle
has covered aggregate and window UDFs since 4b51402 / 89d119f.
Also note the OnceLock import cache landed in b5684bc so callers
pay one import per process, not one per UDF.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`<[u8]>::starts_with` on an empty buffer with a non-empty prefix
already returns false, so the `buf.is_empty() || !buf.starts_with(MAGIC)`
guard in each `try_decode_python_*_udf` function reduces to just
`!buf.starts_with(MAGIC)`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`PyExpr::to_bytes` returns `Bound<PyBytes>`, which crosses the FFI
boundary as Python `bytes`. The defensive `bytes(...)` wrap was a
no-op.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
timsaucer and others added 28 commits May 15, 2026 10:52
Four test methods re-imported `Expr` from `datafusion` despite the
module top already importing it on line 35. Removed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Encoders for scalar, aggregate, and window UDFs build IPC input
schemas from `Field::new(format!("arg_{i}"), dt, true)` — synthetic
names, unconditional nullability. Add a comment at each site
explaining that the field wrapper is only a transport for the
`DataType`: the receiver immediately collapses these fields back to
`Vec<DataType>` when reconstructing `Signature::Exact`, which cannot
encode names or nullability. Setting realistic values here would
be discarded on decode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`refuse_inline_payload` was returning `DataFusionError::Plan`, but the
refusal happens at wire-format decode time, not during planning.
Downstream error classification keys off the variant; surfacing this as
a planner error mis-routes it into "fix your SQL" buckets when the real
problem is that the receiver opted out of inline Python UDF decoding.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cloudpickle module handle was cached in `OnceLock<Py<PyAny>>`, a
process-lifetime slot. The cached `Py` was rooted in the interpreter
that won the first-import race; under CPython subinterpreters (PEP 684),
a teardown of that interpreter would have left the slot pointing at a
dangling reference for the next call.

Swap to `pyo3::sync::PyOnceLock`, which scopes the cached `Py` to the
current interpreter and drops it cleanly on interpreter teardown. The
body collapses to a single `get_or_try_init` call — no more manual
race-handling between `get` and `set`. Same call sites, same cost
profile, no downstream churn.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`_multiprocessing_available()` was called at module import time, which
meant every pytest collection — including unrelated runs that never
execute this module — paid the cost of spawning a probe Pool. In
sandboxed environments that deny semaphore creation, this also surfaced
an irrelevant traceback during collection.

Wrap the probe in `functools.cache` and call it from an autouse fixture
that emits `pytest.skip` when the Pool can't start. The probe now runs
at most once per session, and only when a test in this module is about
to execute; `--collect-only` no longer triggers it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`from_parts` was a verbatim wrapper over `new` with an identical
signature, kept only for parity with the scalar / aggregate variants
where `from_parts` accepts richer state (return `Field`, state
`Vec<FieldRef>`) than `new`. Window UDFs have no such asymmetry, so
the indirection added no value.

Call `PythonFunctionWindowUDF::new` from the codec directly. Same
behavior, one fewer hop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Python constructor path collapses input `Vec<DataType>` into
synthesized `FieldRef`s; bare integer names (`"0"`, `"1"`) made EXPLAIN
and debug output for the state schema opaque. Use `state_{i}` to match
the `arg_{i}` convention the codec applies on the input-field side.

The post-decode path is unaffected — the codec ships full
`FieldRef`s (with the names the sender chose) through IPC and bypasses
this synthesis entirely.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`PartialEq` was acquiring the GIL on every comparison via
`Python::attach`, just to call `__eq__` on the stored callable. Most
equality checks compare `Arc`-shared clones of the same UDF
(expression rewriting, plan diffing), where pointer identity already
settles the question — no Python call needed.

Add `Py::as_ptr` pointer comparison before the `Python::attach` branch
on all three impls (scalar / aggregate / window). Falls through to the
GIL-bound `__eq__` only when the two callables are distinct objects.

No behavior change for unequal-pointer cases; the equal-pointer case
now skips the GIL roundtrip entirely.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The `(v.as_py() or 0) > threshold` line was reading as canonical Python
UDF style. Add a one-line comment flagging that nulls silently coerce
to 0 — fine for the demo dataset (no nulls), but real code should pick
the semantics explicitly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`pytest.raises(Exception, ...)` matched too loosely — a regression that
swallowed the codec refusal as some other error type would have passed
the test. The actual surface error is `RuntimeError` (the
`parse_expr` → `PyRuntimeError` conversion in `expr.rs::from_bytes`).
Switch to the concrete type and add a brief comment explaining why.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The doc on `with_python_udf_inlining(false)` said strict mode "rejects
cloudpickle.loads on untrusted from_bytes input", which could be misread
as making `pickle.loads(untrusted)` safe. It does not.

Strict mode only narrows the codec layer: it stops `Expr::from_bytes`
from invoking `cloudpickle.loads` on the inline `DFPY*` payload. The
outer pickle stream is still arbitrary code — `pickle.loads` honors any
`__reduce__` the bytes name, and an attacker is free to choose one.

Spell that out in the doc so callers don't treat the toggle as a
substitute for "never pickle.loads untrusted input." The Python-side
docstring (`SessionContext.with_python_udf_inlining`) already carries
the equivalent caveat; this brings the Rust side in line.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Prior regeneration used an older uv that stripped revision = 2 and
every upload-time field — 800+ lines of noise unrelated to the
feature. Lockfile is now back to a minimal +12/-1 delta covering
only the cloudpickle dependency addition.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Body asserts the decoded expression resolves successfully — the
previous "refused" name described the opposite outcome and would
mask a real refusal regression in code review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Forked workers inherit the parent's `threading.local()` slots via
copy-on-write until first write, so a child can silently observe a
sender or worker context the parent set. Spell that out next to the
existing thread-local caveat and tell readers to (re)initialize the
slot in the worker rather than rely on inherited state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The strict-mode wire format ships Python-defined UDFs by name; the
existing coverage only exercised that path with `PythonFunctionScalarUDF`
instances. This test uses the FFI `IsNullUDF` so the codec falls
through `try_encode_python_scalar_udf` (downcast fails) and the
default by-name encoding actually runs. Covers explicit
`Expr.to_bytes` / `from_bytes` plus the `pickle.dumps` / `pickle.loads`
route via the sender / worker thread-local slots, and asserts the
strict blob is materially smaller than an inline Python-UDF blob —
catching a regression that silently falls back to inlining for a
non-Python UDF.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…r arg

Previously `ScalarUDF.name`, `AggregateUDF.name`, and `WindowUDF.name`
cached the `name` argument passed to `__init__`. The FFI-capsule branch
discards that argument and constructs from the capsule directly, so the
cached value was wrong — e.g. `udf(IsNullUDF()).name` returned
`"<class 'datafusion_ffi_example.IsNullUDF'>"` (the factory-synthesized
string from `from_pycapsule`) rather than the capsule's own
`my_custom_is_null`.

Expose a `name` getter on `PyScalarUDF`, `PyAggregateUDF`, `PyWindowUDF`
that forwards to the underlying `UDF::name()`, and have each Python
property delegate to it. Same path for Python-defined and
FFI-imported UDFs, no stale cache.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`ctx.with_python_udf_inlining(True)` and
`ctx.with_python_udf_inlining(False)` read identically at the call site
— a bare positional `True` / `False` does not convey which direction the
toggle moves. Forcing `enabled=` at the call site disambiguates:
`with_python_udf_inlining(enabled=False)` is unambiguously the
opt-out form.

The Rust constructor stays positional (internal API); only the public
Python wrapper changes signature. All in-tree callers (tests, docs,
FFI example test) already used or now use the keyword form.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…coders

Scalar, aggregate, and window UDF encode/decode bodies in codec.rs each
contained the same five blocks:

* a `match TypeSignature::Exact { ... } else err` to extract input
  dtypes,
* an `arg_{i}` input-field synthesis with a verbatim 6-line comment,
* a `Schema::new(vec![Field::new(...)])` for the return type,
* a `schema_from_ipc_bytes(...).first().ok_or_else(...)` decode of the
  single-field return blob, and
* a `parse_volatility(&volatility_str).map_err(...)` round-trip.

Six near-identical bodies meant the same comment text lived in three
places, and each `map_err(|e| PyValueError::new_err(format!("{e}")))`
chain appeared a handful of times per body. Extract:

* `signature_input_dtypes(sig, kind)` — `Signature::Exact` extraction
  with a flavor-tagged error.
* `build_input_schema_bytes(&[DataType])` — synth `arg_{i}` fields and
  write IPC. Carries the comment explaining why field metadata is
  discarded on decode.
* `build_single_field_schema_bytes(&Field)` /
  `build_schema_bytes(Vec<Field>)` — IPC writer wrappers.
* `read_input_dtypes(&[u8])` /
  `read_single_return_field(&[u8], kind)` — decode side.
* `arrow_to_py_err(ArrowError) -> PyErr` and `parse_volatility_str` to
  collapse the repeated `map_err` chains.

Wire format unchanged. Six encode/decode bodies collectively shrink
from ~300 to ~140 LOC and stop carrying triplicated comment text that
would otherwise drift independently.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… byte

The wire-format magics `DFPYUDF1` / `DFPYUDA1` / `DFPYUDW1` encoded the
version in the trailing byte but the decoder did nothing with it: a
payload tagged for a future tuple shape would either skip the prefix
check (and fall through to `inner`) or, if the prefix still matched,
detonate inside `cloudpickle.loads` with an opaque tuple-unpack error.

Split the framing into a 7-byte family prefix plus a 1-byte version:

* Family magics: `DFPYUDF` (scalar), `DFPYUDA` (aggregate), `DFPYUDW`
  (window).
* `WIRE_VERSION_CURRENT: u8 = 1` — version emitted by this build.
* `WIRE_VERSION_MIN_SUPPORTED: u8 = 1` — oldest version this build
  decodes; raise when retiring a payload shape.
* `write_wire_header(buf, family)` and
  `strip_wire_header(buf, family, kind) -> Result<Option<&[u8]>>`
  centralize the framing. The decoder returns `Ok(None)` when the
  family prefix is absent (fall through to `inner`), `Ok(Some(payload))`
  inside the supported version range, and an
  `Execution("Inline Python {kind} payload wire-format version vN, this
  build supports vMIN..=vCURRENT. Align datafusion-python versions on
  sender and receiver.")` outside it.

Wire format is unreleased, so the rename of the magic constants is
rename-grade not BC-break-grade. Six unit tests cover the helper:
family absent → fall through; truncated version byte; too-new version;
too-old version (when applicable); successful round-trip; family
mismatch.

Module doc updated with the new framing layout and a version-bump
policy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous refusal text told the operator to "re-encode the bytes
with inlining enabled" — but the operator seeing this error is the
receiver, whose session has inlining off. The receiver cannot re-encode
bytes it did not produce; only the sender can.

Reword to direct the fix at the side that can apply it: ask the sender
to re-encode with inlining disabled (the by-name form a strict
receiver actually accepts), or register the UDF on the receiver and
enable inlining on both sides. Phrase preserves the
"inlining is disabled" substring the strict-mode test asserts against.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… unequal

`PartialEq` on `PythonFunctionScalarUDF` / `PythonFunctionAggregateUDF` /
`PythonFunctionWindowUDF` used `unwrap_or(false)` when the underlying
Python `__eq__` raised. Two same-callable UDFs whose `__eq__` happened
to throw — e.g. a deliberately strict `__eq__` that runs validation —
would compare unequal silently, breaking expression dedup and cache
lookups without leaving any trace for an operator to investigate.

Rust's `PartialEq` cannot return `Result`, so `false` remains the
conservative choice (better to over-distinguish than wrongly merge),
but the exception is now logged at `log::debug` with the UDF's
registered name and the exception text. Mark each call site with a
`FIXME` pointing at the upstream `*UDFImpl` traits — when those expose
a fallible `PartialEq` we can drop the fallback and surface the error.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous docstring on Expr.to_bytes called out the no-ctx case
("Without ctx a default codec is used"). The pickle-support rewrite
replaced the docstring with a description of what travels inside the
bytes but dropped the sentence that answered the immediate question
a reader has when they see the optional `ctx` parameter.

Re-add a one-paragraph explanation: with `ctx`, encoding routes
through the session's installed LogicalExtensionCodec — so
`with_python_udf_inlining` takes effect; without `ctx`, the default
codec is used (Python UDF inlining on, no user-installed extension
codec).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The `# noqa: PLC0415` flag silenced ruff's "import outside top-level"
lint but didn't say why the import was placed there in the first place.

Add a comment naming the actual reason: `datafusion/__init__.py`
imports `datafusion.ipc` before `datafusion.context`, so a module-top
import would force `datafusion.context` to load mid-init of
`datafusion.ipc`. The cycle is benign today (`context.py` only pulls
`expr.py` at module scope, and neither pulls `ipc.py` back), but a
single new import added to `context.py`'s transitive dependency tree
could turn this into a real circular import. Deferring keeps
`datafusion.ipc` import-order-independent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The pickle-Expr work added `S301` (suspicious-pickle-usage) to the
`per-file-ignores` for both `python/tests/*` and `examples/*`. That
silenced ruff's pickle-usage warning everywhere in those trees, not
just at the call sites that genuinely need it — a future test that
adds an unrelated `pickle.loads` on untrusted bytes would pass review
without ever pinging the linter.

Drop the blanket allows and tag each intentional `pickle.loads` call
with a per-line `# noqa: S301` instead (12 sites total: 9 in
`test_pickle_expr.py`, 2 in `_pickle_multiprocessing_helpers.py`,
1 in the FFI example test). New `pickle.loads` calls now have to
justify themselves with their own noqa, keeping S301 a real signal.

Production code (`Expr.__reduce__`) only calls `pickle.dumps`, which
S301 does not cover, so no noqa is needed outside tests/examples.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`test_strict_decoder_refuses_inline_payload` registered the UDF on
the strict receiver, but the codec refusal trips before the receiver's
function registry is ever consulted — the registration was never
exercised. Worse: keeping it there would mask a real regression that
moved the strict-mode check downstream of registry lookup, because the
registry hit would resolve the UDF instead of letting the refusal
surface.

Drop the register_udf call and call out in the docstring why the
absence is load-bearing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The strict-mode tests exercise inline→strict but not the reverse:
`with_python_udf_inlining(enabled=False).with_python_udf_inlining(enabled=True)`
was idempotent in code but never verified, so a future change that
mutated shared codec state instead of cloning (e.g. interning the
codec across cloned contexts) would silently leave the second context
in strict mode without any test tripping.

Add `test_toggle_off_then_on_restores_inline_encoding`: encode the
same expression once via a fresh `SessionContext` and once via a
context toggled off-then-on, assert the blobs are byte-identical, and
decode the toggled-back blob through a fresh context to confirm the
payload is self-contained inline (not a strict by-name payload that
needs registry resolution).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous wording said Python 3.14 changed the "POSIX default" from
fork to forkserver, but macOS (also POSIX) has defaulted to `spawn`
since Python 3.8 — only Linux was using `fork` as default, and only
Linux flipped to `forkserver` in 3.14. Windows has always been
`spawn`.

Replace "POSIX" with "Linux" and add a parenthetical naming the
defaults on the other platforms so readers don't infer the wrong
platform coverage from the change note.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The pickle-on-untrusted-input warning was written out three times: in
the `PythonLogicalCodec::with_python_udf_inlining` rustdoc, in the
matching Python `SessionContext.with_python_udf_inlining` docstring,
and in two places inside `distributing_work.rst` (a free-floating
paragraph at the end of the inlining section plus the dedicated
Security warning block). Three copies of the same load-bearing text
would inevitably drift.

Pick `distributing_work.rst` Security section as canonical:

* Keep the Security warning block intact — it is the single source of
  truth.
* Trim the redundant "Note that pickle.loads itself remains unsafe..."
  paragraph above it to a one-line summary + Sphinx cross-reference
  (`Security`_) so the section header still anchors the link.
* Replace the Python docstring's 5-line warning paragraph with a
  one-sentence summary + `:doc:` link to `distributing_work`.
* Replace the rustdoc warning with a one-sentence summary, a relative
  pointer to `docs/source/user-guide/io/distributing_work.rst`, and a
  link to the upstream Python pickle module security warning so the
  rustdoc remains self-contained for someone reading just the crate.

Behavior unchanged; the strict-toggle test continues to assert on the
substring "inlining is disabled", which lives in `refuse_inline_payload`
(separate from these docs).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant