[SPARK-57489][SS] Deterministic dropDuplicates key resolution via shared analyzer rule#56546
Open
HeartSaVioR wants to merge 8 commits into
Open
[SPARK-57489][SS] Deterministic dropDuplicates key resolution via shared analyzer rule#56546HeartSaVioR wants to merge 8 commits into
HeartSaVioR wants to merge 8 commits into
Conversation
…cates key resolution via shared analyzer rule ### What changes were proposed in this pull request? `dropDuplicates` / `dropDuplicatesWithinWatermark` historically resolve their key columns eagerly: in the DataFrame implementation for Spark Classic (`Dataset.groupColsFromDropDuplicates`, via `colNames.toSet.toSeq`) and in the Spark Connect planner (`transformDeduplicate`). The Classic path's key order therefore depends on Scala's `Set` iteration order, which is undocumented and changed between Scala 2.12 and 2.13, and which also differs from Spark Connect. Because streaming deduplication binds state-store keys by position, this non-determinism can break query restarts. This keeps binding by position, but makes the resolution deterministic and shared between engines: - Add an unresolved logical node `UnresolvedDeduplicate` and an analyzer rule `ResolveDeduplicate` that both Spark Classic (`Dataset.dropDuplicates*`) and Spark Connect (`transformDeduplicate`) defer to. The rule resolves the requested columns with an explicit, order-preserving dedup (intentionally NOT `Seq.distinct`/`Set`, whose ordering is not contractually guaranteed), so the key order is deterministic and identical across Scala versions and engines. - Add `spark.sql.dropDuplicates.deterministicKeyOrder.enabled` (default `true`), persisted via the offset log (`OffsetSeqMetadata.relevantSQLConfs`, with legacy default `false`), so the deterministic order applies only to newly started streaming queries; existing checkpoints keep their original key order. When the conf is off, the rule reproduces each engine's legacy resolution (Classic `toSet`, Connect identity), selected by the node's `legacyDedupColumnNames`. - Resolve the node under the offset-log-pinned conf during initial analysis via a bootstrap in `StreamingQueryManager.createQuery` (only when an `UnresolvedDeduplicate` is present), mirroring how other initial-analysis confs are pinned per query. NOTE: This is `[WIP][REQUIRE-HUMAN-REVIEW]`. `SPARK-XXXXX` is a placeholder; an Apache Spark JIRA will be filed and the number substituted before the upstream PR. ### Why are the changes needed? Streaming deduplication binds state-store keys by position. A key order that depends on Scala's `Set` iteration order is non-deterministic across Scala versions and differs between Spark Classic and Spark Connect, which can break streaming query restarts and cross-engine checkpoint compatibility. ### Does this PR introduce _any_ user-facing change? Yes. For newly started streaming `dropDuplicates`/`dropDuplicatesWithinWatermark` queries, the key columns are resolved in a deterministic, first-occurrence order instead of a Scala-`Set`-dependent order. Deduplication results (rows) are unchanged; only the internal key ordering used for the state store changes. Existing checkpointed queries are unaffected - the effective order is pinned per query via the offset log. Spark Classic and Spark Connect now resolve `dropDuplicates` keys identically, and an unresolvable `dropDuplicates` column now fails at query analysis rather than at the API call in Spark Classic. ### How was this patch tested? New unit suite `ResolveDeduplicateSuite`, a cross-engine interop suite `StreamingDeduplicationConnectInteropSuite`, and an end-to-end offset-log fallback suite `StreamingDeduplicationFallbackSuite`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor
- Rename local `deterministic` -> `orderDeterministically` in ResolveDeduplicate. - Rename `UnresolvedDeduplicate.legacyDedupColumnNames` -> `viaSparkClassic` (states the originating engine directly) and update all call sites/tests. - Clarify in StreamingQueryManager why we intentionally bootstrap the pinned conf here instead of reusing OffsetSeqMetadata.setSessionConf, and that the cost is scoped to plans containing UnresolvedDeduplicate. - In StreamingDeduplicationFallbackSuite, flip the session default to false before the restart so the test proves the offset-log-pinned value overrides the session.
Format StreamingDeduplicationConnectInteropSuite per dev/.scalafmt.conf (maxColumn=98, Asterisk docstrings) to satisfy the connect scalafmt check.
The createQuery bootstrap previously looked for an UnresolvedDeduplicate in df.queryExecution.logical, but Spark Classic chains on the analyzed plan, so any operation after dropDuplicates resolves the node before the streaming query starts; the offset-log fallback then never engaged and existing checkpoints loaded with the legacy key order failed state-store key validation (StreamingStateStoreFormatCompatibilitySuite, KafkaMicroBatchV2Source*Suite). Instead, retain the original recipe on the resolved node: - Add dedupSpec: Option[DeduplicateSpec] (subset, allColumnsAsKeys, viaSparkClassic) to Deduplicate / DeduplicateWithinWatermark, defaulting to None. ResolveDeduplicate sets Some(spec) and resolves the keys with the session conf; a None node is never recomputed (safe for internally/test constructed nodes). - Factor the ordering logic into ResolveDeduplicate.computeKeys, shared by the rule and the bootstrap. - StreamingQueryManager.createQuery recomputes the keys in place from the recipe using the order pinned in the offset log, only when it differs from the session value. This works regardless of operations after dropDuplicates and is correct for both Spark Classic and Spark Connect. StreamingDeduplicationFallbackSuite now projects after dropDuplicates to cover the case that previously broke.
Adding dedupSpec to Deduplicate/DeduplicateWithinWatermark made it show up in the tree string (failing ProtoToParsedPlanTestSuite golden files) and in structural plan equality (failing SparkConnectProtoSuite, which compares a Connect-built plan against a Classic-built one - they now legitimately differ only in DeduplicateSpec.viaSparkClassic). dedupSpec is internal metadata that only drives the legacy key-order fallback on streaming restart, not the resolved plan's semantics, so: - override stringArgs on both nodes to omit it from the tree string (also keeps it out of EXPLAIN output), and - ignore it in SparkConnectProtoSuite's cross-engine plan comparison.
…redundant doc - Restore the orderDeterministically naming (lost in the rework) for the boolean flag in ResolveDeduplicate.computeKeys and the StreamingQueryManager bootstrap (deterministic is overloaded). - Remove the redundant doc comment on DeduplicateWithinWatermark.
The recompute is streaming-specific and no longer touches the analyzer, so the generic StreamingQueryManager.createQuery bootstrap was the wrong home. Move it into MicroBatchExecution.logicalPlan (the streaming logical plan, built once at startup), recomputing the dedup keys from the recipe using the order read from the offset log via OffsetSeqMetadata.readValueOpt - mirroring how the adjacent `enforceNamed` (ENABLE_STREAMING_SOURCE_EVOLUTION) is read. This reuses the existing cached offset-log read, drops the separate checkpoint-location resolution and session handling in createQuery, and reverts createQuery to its original form.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
dropDuplicates/dropDuplicatesWithinWatermarkhistorically resolve their key columns eagerly: in the DataFrame implementation for Spark Classic (Dataset.groupColsFromDropDuplicates, viacolNames.toSet.toSeq) and in the Spark Connect planner (transformDeduplicate). The Spark Classic path's key order therefore depends on Scala'sSetiteration order, which is undocumented and changed between Scala 2.12 and 2.13, and which also differs from Spark Connect. Because streaming deduplication binds state-store keys by position, this non-determinism can break query restarts.This keeps binding by position, but makes the resolution deterministic and shared between engines:
UnresolvedDeduplicateand an analyzer ruleResolveDeduplicatethat both Spark Classic (Dataset.dropDuplicates*) and Spark Connect (transformDeduplicate) defer to. The rule resolves the requested columns with an explicit, order-preserving dedup (intentionally NOTSeq.distinct/Set, whose ordering is not contractually guaranteed), so the key order is deterministic and identical across Scala versions and engines.spark.sql.dropDuplicates.deterministicKeyOrder.enabled(defaulttrue), persisted via the offset log (OffsetSeqMetadata.relevantSQLConfs, with legacy defaultfalse), so the deterministic order applies only to newly started streaming queries; existing checkpoints keep their original key order. When the conf is off, the rule reproduces each engine's legacy resolution (ClassictoSet, Connect identity), selected by the node'slegacyDedupColumnNames.StreamingQueryManager.createQuery(only when anUnresolvedDeduplicateis present), mirroring how other initial-analysis confs are pinned per query.Why are the changes needed?
Streaming deduplication binds state-store keys by position. A key order that depends on Scala's
Setiteration order is non-deterministic across Scala versions and differs between Spark Classic and Spark Connect, which can break streaming query restarts and cross-engine checkpoint compatibility.Does this PR introduce any user-facing change?
Yes. For newly started streaming
dropDuplicates/dropDuplicatesWithinWatermarkqueries, the key columns are resolved in a deterministic, first-occurrence order instead of a Scala-Set-dependent order. Deduplication results (rows) are unchanged; only the internal key ordering used for the state store changes. Existing checkpointed queries are unaffected - the effective order is pinned per query via the offset log. Spark Classic and Spark Connect now resolvedropDuplicateskeys identically, and an unresolvabledropDuplicatescolumn now fails at query analysis rather than at the API call in Spark Classic.How was this patch tested?
New unit suite
ResolveDeduplicateSuite, a cross-engine interop suiteStreamingDeduplicationConnectInteropSuite, and an end-to-end offset-log fallback suiteStreamingDeduplicationFallbackSuite.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8