Skip to content

[SPARK-57489][SS] Deterministic dropDuplicates key resolution via shared analyzer rule#56546

Open
HeartSaVioR wants to merge 8 commits into
apache:masterfrom
HeartSaVioR:dropduplicates-deterministic-key-order
Open

[SPARK-57489][SS] Deterministic dropDuplicates key resolution via shared analyzer rule#56546
HeartSaVioR wants to merge 8 commits into
apache:masterfrom
HeartSaVioR:dropduplicates-deterministic-key-order

Conversation

@HeartSaVioR

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

dropDuplicates / dropDuplicatesWithinWatermark historically resolve their key columns eagerly: in the DataFrame implementation for Spark Classic (Dataset.groupColsFromDropDuplicates, via colNames.toSet.toSeq) and in the Spark Connect planner (transformDeduplicate). The Spark Classic path's key order therefore depends on Scala's Set iteration order, which is undocumented and changed between Scala 2.12 and 2.13, and which also differs from Spark Connect. Because streaming deduplication binds state-store keys by position, this non-determinism can break query restarts.

This keeps binding by position, but makes the resolution deterministic and shared between engines:

  • Add an unresolved logical node UnresolvedDeduplicate and an analyzer rule ResolveDeduplicate that both Spark Classic (Dataset.dropDuplicates*) and Spark Connect (transformDeduplicate) defer to. The rule resolves the requested columns with an explicit, order-preserving dedup (intentionally NOT Seq.distinct/Set, whose ordering is not contractually guaranteed), so the key order is deterministic and identical across Scala versions and engines.
  • Add spark.sql.dropDuplicates.deterministicKeyOrder.enabled (default true), persisted via the offset log (OffsetSeqMetadata.relevantSQLConfs, with legacy default false), so the deterministic order applies only to newly started streaming queries; existing checkpoints keep their original key order. When the conf is off, the rule reproduces each engine's legacy resolution (Classic toSet, Connect identity), selected by the node's legacyDedupColumnNames.
  • Resolve the node under the offset-log-pinned conf during initial analysis via a bootstrap in StreamingQueryManager.createQuery (only when an UnresolvedDeduplicate is present), mirroring how other initial-analysis confs are pinned per query.

Why are the changes needed?

Streaming deduplication binds state-store keys by position. A key order that depends on Scala's Set iteration order is non-deterministic across Scala versions and differs between Spark Classic and Spark Connect, which can break streaming query restarts and cross-engine checkpoint compatibility.

Does this PR introduce any user-facing change?

Yes. For newly started streaming dropDuplicates/dropDuplicatesWithinWatermark queries, the key columns are resolved in a deterministic, first-occurrence order instead of a Scala-Set-dependent order. Deduplication results (rows) are unchanged; only the internal key ordering used for the state store changes. Existing checkpointed queries are unaffected - the effective order is pinned per query via the offset log. Spark Classic and Spark Connect now resolve dropDuplicates keys identically, and an unresolvable dropDuplicates column now fails at query analysis rather than at the API call in Spark Classic.

How was this patch tested?

New unit suite ResolveDeduplicateSuite, a cross-engine interop suite StreamingDeduplicationConnectInteropSuite, and an end-to-end offset-log fallback suite StreamingDeduplicationFallbackSuite.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.8

…cates key resolution via shared analyzer rule

### What changes were proposed in this pull request?

`dropDuplicates` / `dropDuplicatesWithinWatermark` historically resolve their
key columns eagerly: in the DataFrame implementation for Spark Classic
(`Dataset.groupColsFromDropDuplicates`, via `colNames.toSet.toSeq`) and in the
Spark Connect planner (`transformDeduplicate`). The Classic path's key order
therefore depends on Scala's `Set` iteration order, which is undocumented and
changed between Scala 2.12 and 2.13, and which also differs from Spark Connect.
Because streaming deduplication binds state-store keys by position, this
non-determinism can break query restarts.

This keeps binding by position, but makes the resolution deterministic and
shared between engines:

- Add an unresolved logical node `UnresolvedDeduplicate` and an analyzer rule
  `ResolveDeduplicate` that both Spark Classic (`Dataset.dropDuplicates*`) and
  Spark Connect (`transformDeduplicate`) defer to. The rule resolves the
  requested columns with an explicit, order-preserving dedup (intentionally NOT
  `Seq.distinct`/`Set`, whose ordering is not contractually guaranteed), so the
  key order is deterministic and identical across Scala versions and engines.
- Add `spark.sql.dropDuplicates.deterministicKeyOrder.enabled` (default `true`),
  persisted via the offset log (`OffsetSeqMetadata.relevantSQLConfs`, with legacy
  default `false`), so the deterministic order applies only to newly started
  streaming queries; existing checkpoints keep their original key order. When the
  conf is off, the rule reproduces each engine's legacy resolution (Classic
  `toSet`, Connect identity), selected by the node's `legacyDedupColumnNames`.
- Resolve the node under the offset-log-pinned conf during initial analysis via a
  bootstrap in `StreamingQueryManager.createQuery` (only when an
  `UnresolvedDeduplicate` is present), mirroring how other initial-analysis confs
  are pinned per query.

NOTE: This is `[WIP][REQUIRE-HUMAN-REVIEW]`. `SPARK-XXXXX` is a placeholder; an
Apache Spark JIRA will be filed and the number substituted before the upstream PR.

### Why are the changes needed?

Streaming deduplication binds state-store keys by position. A key order that
depends on Scala's `Set` iteration order is non-deterministic across Scala
versions and differs between Spark Classic and Spark Connect, which can break
streaming query restarts and cross-engine checkpoint compatibility.

### Does this PR introduce _any_ user-facing change?

Yes. For newly started streaming `dropDuplicates`/`dropDuplicatesWithinWatermark`
queries, the key columns are resolved in a deterministic, first-occurrence order
instead of a Scala-`Set`-dependent order. Deduplication results (rows) are
unchanged; only the internal key ordering used for the state store changes.
Existing checkpointed queries are unaffected - the effective order is pinned per
query via the offset log. Spark Classic and Spark Connect now resolve
`dropDuplicates` keys identically, and an unresolvable `dropDuplicates` column now
fails at query analysis rather than at the API call in Spark Classic.

### How was this patch tested?

New unit suite `ResolveDeduplicateSuite`, a cross-engine interop suite
`StreamingDeduplicationConnectInteropSuite`, and an end-to-end offset-log fallback
suite `StreamingDeduplicationFallbackSuite`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor
- Rename local `deterministic` -> `orderDeterministically` in ResolveDeduplicate.
- Rename `UnresolvedDeduplicate.legacyDedupColumnNames` -> `viaSparkClassic`
  (states the originating engine directly) and update all call sites/tests.
- Clarify in StreamingQueryManager why we intentionally bootstrap the pinned conf
  here instead of reusing OffsetSeqMetadata.setSessionConf, and that the cost is
  scoped to plans containing UnresolvedDeduplicate.
- In StreamingDeduplicationFallbackSuite, flip the session default to false before
  the restart so the test proves the offset-log-pinned value overrides the session.
Format StreamingDeduplicationConnectInteropSuite per dev/.scalafmt.conf
(maxColumn=98, Asterisk docstrings) to satisfy the connect scalafmt check.
The createQuery bootstrap previously looked for an UnresolvedDeduplicate in
df.queryExecution.logical, but Spark Classic chains on the analyzed plan, so any
operation after dropDuplicates resolves the node before the streaming query
starts; the offset-log fallback then never engaged and existing checkpoints
loaded with the legacy key order failed state-store key validation
(StreamingStateStoreFormatCompatibilitySuite, KafkaMicroBatchV2Source*Suite).

Instead, retain the original recipe on the resolved node:
- Add dedupSpec: Option[DeduplicateSpec] (subset, allColumnsAsKeys,
  viaSparkClassic) to Deduplicate / DeduplicateWithinWatermark, defaulting to
  None. ResolveDeduplicate sets Some(spec) and resolves the keys with the
  session conf; a None node is never recomputed (safe for internally/test
  constructed nodes).
- Factor the ordering logic into ResolveDeduplicate.computeKeys, shared by the
  rule and the bootstrap.
- StreamingQueryManager.createQuery recomputes the keys in place from the recipe
  using the order pinned in the offset log, only when it differs from the
  session value. This works regardless of operations after dropDuplicates and is
  correct for both Spark Classic and Spark Connect.

StreamingDeduplicationFallbackSuite now projects after dropDuplicates to cover
the case that previously broke.
Adding dedupSpec to Deduplicate/DeduplicateWithinWatermark made it show up in
the tree string (failing ProtoToParsedPlanTestSuite golden files) and in
structural plan equality (failing SparkConnectProtoSuite, which compares a
Connect-built plan against a Classic-built one - they now legitimately differ
only in DeduplicateSpec.viaSparkClassic).

dedupSpec is internal metadata that only drives the legacy key-order fallback on
streaming restart, not the resolved plan's semantics, so:
- override stringArgs on both nodes to omit it from the tree string (also keeps
  it out of EXPLAIN output), and
- ignore it in SparkConnectProtoSuite's cross-engine plan comparison.
…redundant doc

- Restore the orderDeterministically naming (lost in the rework) for the boolean
  flag in ResolveDeduplicate.computeKeys and the StreamingQueryManager bootstrap
  (deterministic is overloaded).
- Remove the redundant doc comment on DeduplicateWithinWatermark.
The recompute is streaming-specific and no longer touches the analyzer, so the
generic StreamingQueryManager.createQuery bootstrap was the wrong home. Move it
into MicroBatchExecution.logicalPlan (the streaming logical plan, built once at
startup), recomputing the dedup keys from the recipe using the order read from
the offset log via OffsetSeqMetadata.readValueOpt - mirroring how the adjacent
`enforceNamed` (ENABLE_STREAMING_SOURCE_EVOLUTION) is read. This reuses the
existing cached offset-log read, drops the separate checkpoint-location
resolution and session handling in createQuery, and reverts createQuery to its
original form.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant