Python: Refactor runner/workflow responsibilities and fix checkpoint ancestry bug#6695
Python: Refactor runner/workflow responsibilities and fix checkpoint ancestry bug#6695TaoChenOSU wants to merge 1 commit into
Conversation
…d fix checkpoint ancestry bug Move runner-state ownership out of Workflow into Runner for clearer responsibilities. Add a weakref-based concurrent-run guard in Workflow and fix the stream-drop race in run_until_convergence. Fix the checkpoint ancestry bug by tracking the previous checkpoint id as runner instance state so parent pointers persist across resumed runs. Move Runner to a deprecated lazy __getattr__ export (backward-compatible with DeprecationWarning) and export CheckpointID.
There was a problem hiding this comment.
Pull request overview
Refactors Python workflow execution internals to better separate Workflow vs Runner responsibilities, improves concurrency/run-lifecycle guarding for Workflow.run(), and fixes checkpoint ancestry so post-resume checkpoints correctly chain to the resumed checkpoint.
Changes:
- Move run/checkpoint bookkeeping to
Runnerand persistprevious_checkpoint_idacross resumed runs. - Replace the boolean concurrent-run guard with a weakref-backed guard and add tests covering stream-drop/finalizer races.
- Deprecate public
Runnerexport via lazy__getattr__while keeping backward compatibility, and exportCheckpointID.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| python/packages/core/tests/workflow/test_workflow.py | Updates concurrency-guard expectations and adds regression tests for lock release and stale-finalizer races. |
| python/packages/core/tests/workflow/test_runner.py | Updates runner lifecycle expectations and adds tests for checkpoint chaining after resume. |
| python/packages/core/tests/workflow/test_checkpoint.py | Adds an end-to-end ancestry test ensuring checkpoint chains remain intact across resume boundaries. |
| python/packages/core/agent_framework/_workflows/_workflow.py | Implements weakref-based active-run guard and moves cleanup into _run_core with stale-finalizer protection. |
| python/packages/core/agent_framework/_workflows/_runner.py | Moves checkpoint ancestry tracking into runner instance state and updates resume handling. |
| python/packages/core/agent_framework/init.py | Exports CheckpointID and lazily re-exports deprecated Runner with a DeprecationWarning. |
| # Validate parameters first so misuse fails before we touch any run state. | ||
| self._validate_run_params(message, responses, checkpoint_id) | ||
| self._ensure_not_running() | ||
|
|
| finally: | ||
| # Clear the active-run weakref so a subsequent ``run()`` is allowed, | ||
| # but only if the slot still holds *our* weakref. If the caller | ||
| # dropped this stream after partial iteration and a new ``run()`` | ||
| # already installed its own weakref before our async-gen finalizer | ||
| # ran, ``self._active_run`` now points at the successor; clearing | ||
| # it would silently break the successor's concurrency guard. | ||
| if self._active_run is my_active_run: | ||
| self._active_run = None | ||
| if checkpoint_storage is not None: | ||
| self._runner.context.clear_runtime_checkpoint_storage() |
There was a problem hiding this comment.
Automated Code Review
Reviewers: 5 | Confidence: 88%
✓ Correctness
The PR correctly fixes the checkpoint ancestry bug by promoting
_previous_checkpoint_idto Runner instance state and properly seding it in_mark_resumed. The weakref-based concurrency guard in Workflow is well-designed: the identity check (if self._active_run is my_active_run) in the finally block correctly prevents the stale-finalizer race, and the GC-time weakref invalidation handles dropped/unconsumed streams. The removal of the Runner's_runningflag is clean since concurrency enforcement now lives at the Workflow level. No correctness issues found.
✓ Security Reliability
This PR is clean from a security and reliability standpoint. The weakref-based concurrency guard correctly handles the stale-finalizer race via identity comparison in the finally block. The checkpoint ancestry fix properly persists previous_checkpoint_id as Runner instance state so it survives across resumed runs. Resource cleanup (clearing runtime checkpoint storage) is reliably executed in _run_core's finally regardless of how the run terminates. The removal of Runner's self-locking is explicitly delegated to Workflow, and Runner is deprecated from the public API. No injection risks, resource leaks, or unhandled failure modes were identified.
✓ Test Coverage
The PR has excellent test coverage for its core changes: checkpoint ancestry preservation, the weakref-based concurrent-run guard, sequential reuse after failure, the GC-finalizer race fix, and unconsumed stream cleanup. The primary test coverage gap is the new
Runnerdeprecation warning behavior —from agent_framework import Runnernow emits aDeprecationWarningvia a module-level__getattr__, but no test verifies this user-facing behavior change. All other significant behavioral changes have corresponding tests with meaningful assertions.
✓ Failure Modes
The PR correctly implements the weakref-based concurrency guard and checkpoint ancestry fix. However, the
_run_corefinally block has a stale-finalizer race:clear_runtime_checkpoint_storage()is not guarded by the same identity check that protectsself._active_run, so a dropped stream's deferred finalizer can silently erase a successor run's checkpoint storage.
✓ Design Approach
I found one correctness issue in the new runner/workflow split: resume state is only cleared on the successful exit path, so a resumed run that fails leaves stale checkpoint ancestry state behind for the next fresh run on the same workflow instance.
Automated review by TaoChenOSU's agents
| if self._iteration >= self._max_iterations and await self._ctx.has_messages(): | ||
| raise WorkflowConvergenceException(f"Runner did not converge after {self._max_iterations} iterations.") | ||
| logger.info(f"Workflow completed after {self._iteration} supersteps") | ||
| self._resumed_from_checkpoint = False # Reset resume flag for next run |
There was a problem hiding this comment.
_mark_resumed() sets _resumed_from_checkpoint = True and _previous_checkpoint_id during restore, but this flag is only cleared on the success path at line 176. If run_until_convergence() raises before reaching this line (e.g., an executor failure during a resumed run), the next fresh Workflow.run(message=...) on the same instance will still see _resumed_from_checkpoint == True, skip the initial "superstep 0" checkpoint, and parent later checkpoints to the stale resume point. The flag should be cleared in a finally or at the top of _run_core to prevent stale resume state from leaking across runs. Additionally, test_runner_accepts_new_run_after_previous_failure doesn't verify checkpoint creation behavior on the re-run after a resumed convergence failure.
Motivation & Context
This PR is the first PR for breaking down the changes in #6407 into smaller PRs.
The runner/workflow internals had grown entangled:
Workflowowned runtime checkpoint bookkeeping that conceptually belongs to theRunner, concurrentrun()calls on the same instance were not reliably guarded, and the checkpoint parent pointer (previous_checkpoint_id) was tracked as a per-run local insiderun_until_convergence, so checkpoint ancestry was lost across resumed runs.This is the first of three PRs that split a larger change. It establishes the runner/workflow foundation that the follow-up sub-workflow checkpoint PR and the workflow-reset PR build on.
Description & Review Guide
WorkflowintoRunnerfor clearer responsibilities.Workflowand fix the stream-drop race inrun_until_convergence.previous_checkpoint_idasRunnerinstance state, so checkpoint parent pointers persist across resumed runs.Runnerto a deprecated lazy__getattr__export — it remains importable fromagent_frameworkbut now emits aDeprecationWarning— and exportCheckpointID.from agent_framework import Runnerkeeps working (with a warning); no public API is removed._runner.pyand the concurrent-run guard / stream-drop handling in_workflow.py.Related Issue
Fixes #4588
Contribution Checklist
breaking changelabel (or add "[BREAKING]" to the title prefix, before or after any language prefix) — a workflow keeps the label and title prefix in sync automatically.