diff --git a/README.md b/README.md index feb43d48..4d7091bc 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,8 @@ Some examples require extra dependencies. See each sample's directory for specif * [Nexus Messaging](nexus_messaging): Demonstrates how send signal, update and query messages through Nexus. This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus and sends messages to it. +* [nexus_standalone_operations](nexus_standalone_operations) - Execute Nexus operations directly from client code, +without wrapping them in a workflow. * [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry. * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. diff --git a/nexus_standalone_operations/README.md b/nexus_standalone_operations/README.md new file mode 100644 index 00000000..3307463a --- /dev/null +++ b/nexus_standalone_operations/README.md @@ -0,0 +1,66 @@ +This sample demonstrates how to execute Nexus operations directly from client code, +without wrapping them in a workflow. It shows both synchronous and asynchronous +(workflow-backed) operations, plus listing and counting operations. + + +### Temporal Python SDK support for Standalone Nexus Operations is at [Pre-release](https://docs.temporal.io/evaluate/development-production-features/release-stages#pre-release). + +All APIs are experimental and may be subject to backwards-incompatible changes. + +Standalone Nexus operations require a server version that supports this feature. Use the dev server build at https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations. + +### Sample directory structure + +- [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations +- [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation +- [worker.py](./worker.py) - Temporal worker that hosts the Nexus service +- [starter.py](./starter.py) - Client that executes standalone Nexus operations + + +### Instructions + +Run the [Temporal dev server build that supports standalone Nexus operations](https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations). +(If you are going to run locally, you will want to start it in another terminal; this command is blocking and runs until it receives a SIGINT (Ctrl + C) command.) + +Start a Temporal dev server with the dynamic config flags required for standalone Nexus operations: + +```bash +temporal server start-dev \ + --dynamic-config-value "nexusoperation.enableStandalone=true" \ + --dynamic-config-value "history.enableChasmCallbacks=true" +``` + +Create the Nexus endpoint: + +``` +temporal operator nexus endpoint create \ + --name nexus-standalone-operations-endpoint \ + --target-namespace default \ + --target-task-queue nexus-standalone-operations +``` + +In one terminal, start the worker: +``` +uv run nexus_standalone_operations/worker.py +``` + +In another terminal, run the starter: +``` +uv run nexus_standalone_operations/starter.py +``` + +### Expected output + +``` +Echo result: hello +Hello result: Hello, World! + +Listing Nexus operations: + OperationId: echo-..., Operation: echo, Status: COMPLETED + OperationId: hello-..., Operation: hello, Status: COMPLETED + +Total Nexus operations: 2 +``` + +If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run. +The same goes for the total number of operations. \ No newline at end of file diff --git a/nexus_standalone_operations/__init__.py b/nexus_standalone_operations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_standalone_operations/handler.py b/nexus_standalone_operations/handler.py new file mode 100644 index 00000000..da18e947 --- /dev/null +++ b/nexus_standalone_operations/handler.py @@ -0,0 +1,45 @@ +"""Nexus service handler and backing workflow for standalone operations sample.""" + +from __future__ import annotations + +import uuid + +import nexusrpc.handler +from temporalio import nexus, workflow + +from nexus_standalone_operations.service import ( + EchoInput, + EchoOutput, + HelloInput, + HelloOutput, + MyNexusService, +) + + +@workflow.defn +class HelloWorkflow: + @workflow.run + async def run(self, input: HelloInput) -> HelloOutput: + return HelloOutput(greeting=f"Hello, {input.name}!") + + +@nexusrpc.handler.service_handler(service=MyNexusService) +class MyNexusServiceHandler: + @nexusrpc.handler.sync_operation + async def echo( + self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput + ) -> EchoOutput: + return EchoOutput(message=input.message) + + @nexus.temporal_operation + async def hello( + self, + _ctx: nexus.TemporalStartOperationContext, + client: nexus.TemporalNexusClient, + input: HelloInput, + ) -> nexus.TemporalOperationResult[HelloOutput]: + return await client.start_workflow( + HelloWorkflow.run, + input, + id=str(uuid.uuid4()), + ) diff --git a/nexus_standalone_operations/service.py b/nexus_standalone_operations/service.py new file mode 100644 index 00000000..a8a906db --- /dev/null +++ b/nexus_standalone_operations/service.py @@ -0,0 +1,39 @@ +"""Nexus service definition for standalone operations sample. + +Defines a Nexus service with two operations: +- echo: a synchronous operation that echoes the input message +- hello: an asynchronous (workflow-backed) operation that returns a greeting + +This service definition is used by both the handler (to validate operation +signatures) and the client (to create type-safe nexus clients). +""" + +from dataclasses import dataclass + +import nexusrpc + + +@dataclass +class EchoInput: + message: str + + +@dataclass +class EchoOutput: + message: str + + +@dataclass +class HelloInput: + name: str + + +@dataclass +class HelloOutput: + greeting: str + + +@nexusrpc.service +class MyNexusService: + echo: nexusrpc.Operation[EchoInput, EchoOutput] + hello: nexusrpc.Operation[HelloInput, HelloOutput] diff --git a/nexus_standalone_operations/starter.py b/nexus_standalone_operations/starter.py new file mode 100644 index 00000000..11d52e53 --- /dev/null +++ b/nexus_standalone_operations/starter.py @@ -0,0 +1,74 @@ +"""Starter that demonstrates standalone Nexus operation execution. + +Unlike other Nexus samples that call operations from within a workflow, this +sample executes Nexus operations directly from client code using the standalone +Nexus operation APIs. +""" + +import asyncio +import uuid +from datetime import timedelta + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig + +from nexus_standalone_operations.service import ( + EchoInput, + HelloInput, + MyNexusService, +) + +ENDPOINT_NAME = "nexus-standalone-operations-endpoint" + + +async def main() -> None: + config = ClientConfig.load_client_connect_config() + _ = config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + # Create a typed NexusClient bound to the endpoint and service. + # The endpoint must be pre-created on the server (see README). + nexus_client = client.create_nexus_client( + service=MyNexusService, endpoint=ENDPOINT_NAME + ) + + # Start sync echo operation and await the result immediately. + echo_result = await nexus_client.execute_operation( + MyNexusService.echo, + EchoInput(message="hello"), + id=f"echo-{uuid.uuid4()}", + schedule_to_close_timeout=timedelta(seconds=10), + ) + print(f"Echo result: {echo_result.message}") + + # Start async (workflow-backed) hello operation and get a NexusOperationHandle. + handle = await nexus_client.start_operation( + MyNexusService.hello, + HelloInput(name="World"), + id=f"hello-{uuid.uuid4()}", + schedule_to_close_timeout=timedelta(seconds=10), + ) + + print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}") + + # Use the NexusOperationHandle to await the result of the operation. + hello_result = await handle.result() + print(f"`MyNexusService.Hello` result: {hello_result.greeting}") + + # List nexus operations. + print("\nListing Nexus operations:") + query = f'Endpoint = "{ENDPOINT_NAME}"' + async for op in client.list_nexus_operations(query): + print( + f" OperationId: {op.operation_id},", + f" Operation: {op.operation},", + f" Status: {op.status.name}", + ) + + # Count nexus operations. + count = await client.count_nexus_operations(query) + print(f"\nTotal Nexus operations: {count.count}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nexus_standalone_operations/worker.py b/nexus_standalone_operations/worker.py new file mode 100644 index 00000000..0de4ac3b --- /dev/null +++ b/nexus_standalone_operations/worker.py @@ -0,0 +1,41 @@ +"""Worker that hosts the Nexus service for standalone operations sample.""" + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler + +interrupt_event = asyncio.Event() + +TASK_QUEUE = "nexus-standalone-operations" + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + _ = config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloWorkflow], + nexus_service_handlers=[MyNexusServiceHandler()], + ): + logging.info("Worker started, ctrl+c to exit") + _ = await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/tests/conftest.py b/tests/conftest.py index 65de246e..3d054e71 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,12 +42,17 @@ async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]: env_type = request.config.getoption("--workflow-environment") if env_type == "local": env = await WorkflowEnvironment.start_local( + dev_server_download_version="v1.7.2-standalone-nexus-operations", dev_server_extra_args=[ "--dynamic-config-value", "frontend.enableExecuteMultiOperation=true", "--dynamic-config-value", "system.enableEagerWorkflowStart=true", - ] + "--dynamic-config-value", + "nexusoperation.enableStandalone=true", + "--dynamic-config-value", + "history.enableChasmCallbacks=true", + ], ) elif env_type == "time-skipping": env = await WorkflowEnvironment.start_time_skipping() diff --git a/tests/nexus_standalone_operations/__init__.py b/tests/nexus_standalone_operations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/nexus_standalone_operations/nexus_standalone_operations_test.py b/tests/nexus_standalone_operations/nexus_standalone_operations_test.py new file mode 100644 index 00000000..b0bd2dad --- /dev/null +++ b/tests/nexus_standalone_operations/nexus_standalone_operations_test.py @@ -0,0 +1,72 @@ +import uuid +from datetime import timedelta + +import pytest +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler +from nexus_standalone_operations.service import ( + EchoInput, + EchoOutput, + HelloInput, + HelloOutput, + MyNexusService, +) +from nexus_standalone_operations.worker import TASK_QUEUE +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + + +async def test_nexus_standalone_operations(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Time Skipping server does not support standalone nexus operations") + + endpoint_name = f"test-nexus-standalone-{uuid.uuid4()}" + + create_response = await create_nexus_endpoint( + name=endpoint_name, + task_queue=TASK_QUEUE, + client=client, + ) + try: + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloWorkflow], + nexus_service_handlers=[MyNexusServiceHandler()], + ): + nexus_client = client.create_nexus_client( + service=MyNexusService, endpoint=endpoint_name + ) + + # Test sync echo operation + echo_result = None + echo_result = await nexus_client.execute_operation( + MyNexusService.echo, + EchoInput(message="test-echo"), + id=str(uuid.uuid4()), + schedule_to_close_timeout=timedelta(seconds=10), + ) + assert isinstance(echo_result, EchoOutput) + assert echo_result.message == "test-echo" + + # Test async hello operation + hello_result = await nexus_client.execute_operation( + MyNexusService.hello, + HelloInput(name="Test"), + id=str(uuid.uuid4()), + schedule_to_close_timeout=timedelta(seconds=10), + ) + assert isinstance(hello_result, HelloOutput) + assert hello_result.greeting == "Hello, Test!" + + # Test count operations + count = await client.count_nexus_operations(f'Endpoint = "{endpoint_name}"') + assert count.count >= 0 + finally: + _ = await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + )