Skip to content

Python: Refactor runner/workflow responsibilities and fix checkpoint ancestry bug#6695

Draft
TaoChenOSU wants to merge 1 commit into
mainfrom
feature/python-runner-refactor
Draft

Python: Refactor runner/workflow responsibilities and fix checkpoint ancestry bug#6695
TaoChenOSU wants to merge 1 commit into
mainfrom
feature/python-runner-refactor

Conversation

@TaoChenOSU

@TaoChenOSU TaoChenOSU commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Motivation & Context

This PR is the first PR for breaking down the changes in #6407 into smaller PRs.

The runner/workflow internals had grown entangled: Workflow owned runtime checkpoint bookkeeping that conceptually belongs to the Runner, concurrent run() calls on the same instance were not reliably guarded, and the checkpoint parent pointer (previous_checkpoint_id) was tracked as a per-run local inside run_until_convergence, so checkpoint ancestry was lost across resumed runs.

This is the first of three PRs that split a larger change. It establishes the runner/workflow foundation that the follow-up sub-workflow checkpoint PR and the workflow-reset PR build on.

Description & Review Guide

  • What are the major changes?
    • Move runner-state ownership out of Workflow into Runner for clearer responsibilities.
    • Add a weakref-based concurrent-run guard in Workflow and fix the stream-drop race in run_until_convergence.
    • Fix the checkpoint ancestry bug by tracking previous_checkpoint_id as Runner instance state, so checkpoint parent pointers persist across resumed runs.
    • Move Runner to a deprecated lazy __getattr__ export — it remains importable from agent_framework but now emits a DeprecationWarning — and export CheckpointID.
  • What is the impact of these changes?
    • Checkpoints created after a resume now form a correct ancestry chain.
    • Concurrent and aborted-stream runs are handled without orphaned work.
    • from agent_framework import Runner keeps working (with a warning); no public API is removed.
  • What do you want reviewers to focus on?
    • The checkpoint ancestry fix in _runner.py and the concurrent-run guard / stream-drop handling in _workflow.py.

Related Issue

Fixes #4588

Contribution Checklist

  • The code builds clean without any errors or warnings
  • All unit tests pass, and I have added new tests where possible
  • The PR follows the Contribution Guidelines
  • This PR is linked to an issue and there is no other open PR for this issue (see Related Issue above).
  • This is not a breaking change. If it is a breaking change, add the breaking change label (or add "[BREAKING]" to the title prefix, before or after any language prefix) — a workflow keeps the label and title prefix in sync automatically.

…d fix checkpoint ancestry bug

Move runner-state ownership out of Workflow into Runner for clearer responsibilities. Add a weakref-based concurrent-run guard in Workflow and fix the stream-drop race in run_until_convergence. Fix the checkpoint ancestry bug by tracking the previous checkpoint id as runner instance state so parent pointers persist across resumed runs. Move Runner to a deprecated lazy __getattr__ export (backward-compatible with DeprecationWarning) and export CheckpointID.
Copilot AI review requested due to automatic review settings June 24, 2026 00:33
@moonbox3 moonbox3 added the python Usage: [Issues, PRs], Target: Python label Jun 24, 2026
@TaoChenOSU TaoChenOSU self-assigned this Jun 24, 2026
@TaoChenOSU TaoChenOSU added the workflows Usage: [Issues, PRs], Target: Workflows label Jun 24, 2026
@TaoChenOSU TaoChenOSU moved this to In Review in Agent Framework Jun 24, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Python workflow execution internals to better separate Workflow vs Runner responsibilities, improves concurrency/run-lifecycle guarding for Workflow.run(), and fixes checkpoint ancestry so post-resume checkpoints correctly chain to the resumed checkpoint.

Changes:

  • Move run/checkpoint bookkeeping to Runner and persist previous_checkpoint_id across resumed runs.
  • Replace the boolean concurrent-run guard with a weakref-backed guard and add tests covering stream-drop/finalizer races.
  • Deprecate public Runner export via lazy __getattr__ while keeping backward compatibility, and export CheckpointID.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
python/packages/core/tests/workflow/test_workflow.py Updates concurrency-guard expectations and adds regression tests for lock release and stale-finalizer races.
python/packages/core/tests/workflow/test_runner.py Updates runner lifecycle expectations and adds tests for checkpoint chaining after resume.
python/packages/core/tests/workflow/test_checkpoint.py Adds an end-to-end ancestry test ensuring checkpoint chains remain intact across resume boundaries.
python/packages/core/agent_framework/_workflows/_workflow.py Implements weakref-based active-run guard and moves cleanup into _run_core with stale-finalizer protection.
python/packages/core/agent_framework/_workflows/_runner.py Moves checkpoint ancestry tracking into runner instance state and updates resume handling.
python/packages/core/agent_framework/init.py Exports CheckpointID and lazily re-exports deprecated Runner with a DeprecationWarning.

Comment on lines +742 to +744
# Validate parameters first so misuse fails before we touch any run state.
self._validate_run_params(message, responses, checkpoint_id)
self._ensure_not_running()

Comment on lines +847 to +857
finally:
# Clear the active-run weakref so a subsequent ``run()`` is allowed,
# but only if the slot still holds *our* weakref. If the caller
# dropped this stream after partial iteration and a new ``run()``
# already installed its own weakref before our async-gen finalizer
# ran, ``self._active_run`` now points at the successor; clearing
# it would silently break the successor's concurrency guard.
if self._active_run is my_active_run:
self._active_run = None
if checkpoint_storage is not None:
self._runner.context.clear_runtime_checkpoint_storage()

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 5 | Confidence: 88%

✓ Correctness

The PR correctly fixes the checkpoint ancestry bug by promoting _previous_checkpoint_id to Runner instance state and properly seding it in _mark_resumed. The weakref-based concurrency guard in Workflow is well-designed: the identity check (if self._active_run is my_active_run) in the finally block correctly prevents the stale-finalizer race, and the GC-time weakref invalidation handles dropped/unconsumed streams. The removal of the Runner's _running flag is clean since concurrency enforcement now lives at the Workflow level. No correctness issues found.

✓ Security Reliability

This PR is clean from a security and reliability standpoint. The weakref-based concurrency guard correctly handles the stale-finalizer race via identity comparison in the finally block. The checkpoint ancestry fix properly persists previous_checkpoint_id as Runner instance state so it survives across resumed runs. Resource cleanup (clearing runtime checkpoint storage) is reliably executed in _run_core's finally regardless of how the run terminates. The removal of Runner's self-locking is explicitly delegated to Workflow, and Runner is deprecated from the public API. No injection risks, resource leaks, or unhandled failure modes were identified.

✓ Test Coverage

The PR has excellent test coverage for its core changes: checkpoint ancestry preservation, the weakref-based concurrent-run guard, sequential reuse after failure, the GC-finalizer race fix, and unconsumed stream cleanup. The primary test coverage gap is the new Runner deprecation warning behavior — from agent_framework import Runner now emits a DeprecationWarning via a module-level __getattr__, but no test verifies this user-facing behavior change. All other significant behavioral changes have corresponding tests with meaningful assertions.

✓ Failure Modes

The PR correctly implements the weakref-based concurrency guard and checkpoint ancestry fix. However, the _run_core finally block has a stale-finalizer race: clear_runtime_checkpoint_storage() is not guarded by the same identity check that protects self._active_run, so a dropped stream's deferred finalizer can silently erase a successor run's checkpoint storage.

✓ Design Approach

I found one correctness issue in the new runner/workflow split: resume state is only cleared on the successful exit path, so a resumed run that fails leaves stale checkpoint ancestry state behind for the next fresh run on the same workflow instance.


Automated review by TaoChenOSU's agents

if self._iteration >= self._max_iterations and await self._ctx.has_messages():
raise WorkflowConvergenceException(f"Runner did not converge after {self._max_iterations} iterations.")
logger.info(f"Workflow completed after {self._iteration} supersteps")
self._resumed_from_checkpoint = False # Reset resume flag for next run

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_mark_resumed() sets _resumed_from_checkpoint = True and _previous_checkpoint_id during restore, but this flag is only cleared on the success path at line 176. If run_until_convergence() raises before reaching this line (e.g., an executor failure during a resumed run), the next fresh Workflow.run(message=...) on the same instance will still see _resumed_from_checkpoint == True, skip the initial "superstep 0" checkpoint, and parent later checkpoints to the stale resume point. The flag should be cleared in a finally or at the top of _run_core to prevent stale resume state from leaking across runs. Additionally, test_runner_accepts_new_run_after_previous_failure doesn't verify checkpoint creation behavior on the re-run after a resumed convergence failure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

python Usage: [Issues, PRs], Target: Python workflows Usage: [Issues, PRs], Target: Workflows

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

Python: Preserve checkpoint ancestry when workflows resume from storage

3 participants