Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
ADDED

- Added `durabletask.scheduled`, a recurring schedule feature built on durable
entities. Use `configure_scheduled_tasks(worker)` to enable it on a worker,
entities. Use `worker.configure_scheduled_tasks()` to enable it on a worker,
then manage schedules from the client via `ScheduledTaskClient` (and the
per-schedule `ScheduleClient`). Supports creating, describing, listing,
updating, pausing, resuming, and deleting schedules with configurable
Expand Down
8 changes: 3 additions & 5 deletions durabletask/scheduled/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""Scheduled tasks support for the Durable Task SDK.

This package provides a recurring schedule feature built on top of durable
entities and a helper orchestrator. Register the entity and orchestrator with a
worker via :func:`configure_scheduled_tasks`, then manage schedules from the
client via :class:`ScheduledTaskClient`.
entities and a helper orchestrator. Enable it on a worker via
:meth:`durabletask.worker.TaskHubGrpcWorker.configure_scheduled_tasks`, then
manage schedules from the client via :class:`ScheduledTaskClient`.
"""

from durabletask.scheduled.client import ScheduleClient, ScheduledTaskClient
Expand All @@ -17,7 +17,6 @@
from durabletask.scheduled.models import (ScheduleCreationOptions,
ScheduleDescription, ScheduleQuery,
ScheduleUpdateOptions)
from durabletask.scheduled.registration import configure_scheduled_tasks
from durabletask.scheduled.schedule_status import ScheduleStatus

__all__ = [
Expand All @@ -32,7 +31,6 @@
"ScheduleNotFoundError",
"ScheduleClientValidationError",
"ScheduleInvalidTransitionError",
"configure_scheduled_tasks",
]

PACKAGE_NAME = "durabletask.scheduled"
22 changes: 0 additions & 22 deletions durabletask/scheduled/registration.py

This file was deleted.

32 changes: 27 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,8 @@ def __init__(
self._runLoop: Thread | None = None
# Extra worker capabilities advertised to the backend in
# GetWorkItemsRequest (in addition to ones derived from worker state such
# as LARGE_PAYLOADS). Feature-enablement helpers like
# durabletask.scheduled.configure_scheduled_tasks register theirs here.
# as LARGE_PAYLOADS). A feature-enablement helper like
# TaskHubGrpcWorker.configure_scheduled_tasks registers its own here.
self._capabilities: set[int] = set()
Comment thread
andystaples marked this conversation as resolved.

@property
Expand Down Expand Up @@ -645,16 +645,38 @@ def add_capability(self, capability: int) -> None:
"""Advertise a worker capability to the backend in ``GetWorkItemsRequest``.

Most users do not call this directly; feature-enablement helpers such as
:func:`durabletask.scheduled.configure_scheduled_tasks` use it to
advertise the capabilities (``pb.WORKER_CAPABILITY_*``) their feature
relies on.
:meth:`TaskHubGrpcWorker.configure_scheduled_tasks` use it to advertise
the capabilities (``pb.WORKER_CAPABILITY_*``) their feature relies on.
"""
if self._is_running:
raise RuntimeError(
"Capabilities cannot be added while the worker is running."
)
self._capabilities.add(capability)

def configure_scheduled_tasks(self) -> None:
"""Enable scheduled tasks support on this worker.

Registers the schedule entity and operation orchestrator and advertises
the scheduled-tasks capability to the backend. Call this before starting
the worker. Schedules are then managed from the client via
:class:`durabletask.scheduled.ScheduledTaskClient`.
"""
if self._is_running:
raise RuntimeError(
"Scheduled tasks cannot be configured while the worker is running."
)
# Imported lazily to avoid a circular import: durabletask.scheduled
# imports from durabletask.worker.
from durabletask.scheduled.orchestrator import (
execute_schedule_operation_orchestrator,
)
from durabletask.scheduled.schedule_entity import ENTITY_NAME, Schedule

self.add_entity(Schedule, ENTITY_NAME)
self.add_orchestrator(execute_schedule_operation_orchestrator)
self.add_capability(pb.WORKER_CAPABILITY_SCHEDULED_TASKS)

def use_versioning(self, version: VersioningOptions) -> None:
"""Initializes versioning options for sub-orchestrators and activities."""
if self._is_running:
Expand Down
5 changes: 2 additions & 3 deletions examples/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
from durabletask import task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from durabletask.scheduled import (ScheduledTaskClient, ScheduleCreationOptions,
configure_scheduled_tasks)
from durabletask.scheduled import ScheduledTaskClient, ScheduleCreationOptions


def greet_orchestrator(ctx: task.OrchestrationContext, name: str) -> Generator[task.Task[Any], Any, Any]:
Expand All @@ -41,7 +40,7 @@ def greet_orchestrator(ctx: task.OrchestrationContext, name: str) -> Generator[t
taskhub=taskhub_name, token_credential=credential) as worker:
worker.add_orchestrator(greet_orchestrator)
# Register the schedule entity and operation orchestrator.
configure_scheduled_tasks(worker)
worker.configure_scheduled_tasks()
worker.start()

client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from durabletask.scheduled import (ScheduledTaskClient, ScheduleCreationOptions,
ScheduleQuery, ScheduleStatus,
ScheduleUpdateOptions,
configure_scheduled_tasks)
ScheduleUpdateOptions)

import os

Expand Down Expand Up @@ -61,7 +60,7 @@ def _make_worker() -> DurableTaskSchedulerWorker:
w = DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=None)
w.add_orchestrator(target_orchestrator)
configure_scheduled_tasks(w)
w.configure_scheduled_tasks()
return w


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime, timedelta, timezone

import durabletask.internal.orchestrator_service_pb2 as pb
from durabletask.scheduled import configure_scheduled_tasks
from durabletask.scheduled.client import ScheduledTaskClient
from durabletask.scheduled.models import (ScheduleConfiguration,
ScheduleCreationOptions, ScheduleQuery,
Expand Down Expand Up @@ -69,7 +68,7 @@ def test_status_filter(self):
class TestScheduledTasksCapability:
def test_configure_advertises_scheduled_tasks_capability(self):
worker = TaskHubGrpcWorker()
configure_scheduled_tasks(worker)
worker.configure_scheduled_tasks()
assert pb.WORKER_CAPABILITY_SCHEDULED_TASKS in worker._capabilities # pyright: ignore[reportPrivateUsage]

def test_capability_absent_by_default(self):
Expand Down
5 changes: 2 additions & 3 deletions tests/durabletask/scheduled/test_scheduled_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from durabletask import client, task, worker
from durabletask.scheduled import (ScheduledTaskClient, ScheduleCreationOptions,
ScheduleQuery, ScheduleStatus,
ScheduleUpdateOptions,
configure_scheduled_tasks)
ScheduleUpdateOptions)
from durabletask.testing import create_test_backend

from tests.durabletask._port_utils import find_free_port
Expand Down Expand Up @@ -63,7 +62,7 @@ def target_orchestrator(ctx: task.OrchestrationContext, value):
def _make_worker() -> worker.TaskHubGrpcWorker:
w = worker.TaskHubGrpcWorker(host_address=HOST)
w.add_orchestrator(target_orchestrator)
configure_scheduled_tasks(w)
w.configure_scheduled_tasks()
return w


Expand Down
Loading