Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Some examples require extra dependencies. See each sample's directory for specif
* [Nexus Messaging](nexus_messaging): Demonstrates how send signal, update and query messages through Nexus.
This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus
and sends messages to it.
* [nexus_standalone_operations](nexus_standalone_operations) - Execute Nexus operations directly from client code,
without wrapping them in a workflow.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
66 changes: 66 additions & 0 deletions nexus_standalone_operations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
This sample demonstrates how to execute Nexus operations directly from client code,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call out this feature is not stable near the top

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a blurb about SANO being pre-release and the stability guarantees that implies. Copied from the docs PR.

without wrapping them in a workflow. It shows both synchronous and asynchronous
(workflow-backed) operations, plus listing and counting operations.


### Temporal Python SDK support for Standalone Nexus Operations is at [Pre-release](https://docs.temporal.io/evaluate/development-production-features/release-stages#pre-release).

All APIs are experimental and may be subject to backwards-incompatible changes.

Standalone Nexus operations require a server version that supports this feature. Use the dev server build at https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations.

### Sample directory structure

- [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations
- [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation
- [worker.py](./worker.py) - Temporal worker that hosts the Nexus service
- [starter.py](./starter.py) - Client that executes standalone Nexus operations


### Instructions

Run the [Temporal dev server build that supports standalone Nexus operations](https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations).
(If you are going to run locally, you will want to start it in another terminal; this command is blocking and runs until it receives a SIGINT (Ctrl + C) command.)

Start a Temporal dev server with the dynamic config flags required for standalone Nexus operations:

```bash
temporal server start-dev \
--dynamic-config-value "nexusoperation.enableStandalone=true" \
--dynamic-config-value "history.enableChasmCallbacks=true"
```

Create the Nexus endpoint:

```
temporal operator nexus endpoint create \
--name nexus-standalone-operations-endpoint \
--target-namespace default \
--target-task-queue nexus-standalone-operations
```

In one terminal, start the worker:
```
uv run nexus_standalone_operations/worker.py
```

In another terminal, run the starter:
```
uv run nexus_standalone_operations/starter.py
```

### Expected output

```
Echo result: hello
Hello result: Hello, World!

Listing Nexus operations:
OperationId: echo-..., Operation: echo, Status: COMPLETED
OperationId: hello-..., Operation: hello, Status: COMPLETED

Total Nexus operations: 2
```

If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run.
The same goes for the total number of operations.
Empty file.
45 changes: 45 additions & 0 deletions nexus_standalone_operations/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Nexus service handler and backing workflow for standalone operations sample."""

from __future__ import annotations

import uuid

import nexusrpc.handler
from temporalio import nexus, workflow

from nexus_standalone_operations.service import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
MyNexusService,
)


@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, input: HelloInput) -> HelloOutput:
return HelloOutput(greeting=f"Hello, {input.name}!")


@nexusrpc.handler.service_handler(service=MyNexusService)
class MyNexusServiceHandler:
@nexusrpc.handler.sync_operation
async def echo(
self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput
) -> EchoOutput:
return EchoOutput(message=input.message)

@nexus.temporal_operation
async def hello(
self,
_ctx: nexus.TemporalStartOperationContext,
client: nexus.TemporalNexusClient,
input: HelloInput,
) -> nexus.TemporalOperationResult[HelloOutput]:
return await client.start_workflow(
HelloWorkflow.run,
input,
id=str(uuid.uuid4()),
)
39 changes: 39 additions & 0 deletions nexus_standalone_operations/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Nexus service definition for standalone operations sample.

Defines a Nexus service with two operations:
- echo: a synchronous operation that echoes the input message
- hello: an asynchronous (workflow-backed) operation that returns a greeting

This service definition is used by both the handler (to validate operation
signatures) and the client (to create type-safe nexus clients).
"""

from dataclasses import dataclass

import nexusrpc


@dataclass
class EchoInput:
message: str


@dataclass
class EchoOutput:
message: str


@dataclass
class HelloInput:
name: str


@dataclass
class HelloOutput:
greeting: str


@nexusrpc.service
class MyNexusService:
echo: nexusrpc.Operation[EchoInput, EchoOutput]
hello: nexusrpc.Operation[HelloInput, HelloOutput]
74 changes: 74 additions & 0 deletions nexus_standalone_operations/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Starter that demonstrates standalone Nexus operation execution.

Unlike other Nexus samples that call operations from within a workflow, this
sample executes Nexus operations directly from client code using the standalone
Nexus operation APIs.
"""

import asyncio
import uuid
from datetime import timedelta

from temporalio.client import Client
from temporalio.envconfig import ClientConfig

from nexus_standalone_operations.service import (
EchoInput,
HelloInput,
MyNexusService,
)

ENDPOINT_NAME = "nexus-standalone-operations-endpoint"


async def main() -> None:
config = ClientConfig.load_client_connect_config()
_ = config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

# Create a typed NexusClient bound to the endpoint and service.
# The endpoint must be pre-created on the server (see README).
nexus_client = client.create_nexus_client(
service=MyNexusService, endpoint=ENDPOINT_NAME
)

# Start sync echo operation and await the result immediately.
echo_result = await nexus_client.execute_operation(
MyNexusService.echo,
EchoInput(message="hello"),
id=f"echo-{uuid.uuid4()}",
schedule_to_close_timeout=timedelta(seconds=10),
)
print(f"Echo result: {echo_result.message}")

# Start async (workflow-backed) hello operation and get a NexusOperationHandle.
handle = await nexus_client.start_operation(
MyNexusService.hello,
HelloInput(name="World"),
id=f"hello-{uuid.uuid4()}",
schedule_to_close_timeout=timedelta(seconds=10),
)

print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}")

# Use the NexusOperationHandle to await the result of the operation.
hello_result = await handle.result()
print(f"`MyNexusService.Hello` result: {hello_result.greeting}")

# List nexus operations.
print("\nListing Nexus operations:")
query = f'Endpoint = "{ENDPOINT_NAME}"'
async for op in client.list_nexus_operations(query):
print(
f" OperationId: {op.operation_id},",
f" Operation: {op.operation},",
f" Status: {op.status.name}",
)

# Count nexus operations.
count = await client.count_nexus_operations(query)
print(f"\nTotal Nexus operations: {count.count}")


if __name__ == "__main__":
asyncio.run(main())
41 changes: 41 additions & 0 deletions nexus_standalone_operations/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Worker that hosts the Nexus service for standalone operations sample."""

import asyncio
import logging

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler

interrupt_event = asyncio.Event()

TASK_QUEUE = "nexus-standalone-operations"


async def main() -> None:
logging.basicConfig(level=logging.INFO)

config = ClientConfig.load_client_connect_config()
_ = config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloWorkflow],
nexus_service_handlers=[MyNexusServiceHandler()],
):
logging.info("Worker started, ctrl+c to exit")
_ = await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]:
env_type = request.config.getoption("--workflow-environment")
if env_type == "local":
env = await WorkflowEnvironment.start_local(
dev_server_download_version="v1.7.2-standalone-nexus-operations",
dev_server_extra_args=[
"--dynamic-config-value",
"frontend.enableExecuteMultiOperation=true",
"--dynamic-config-value",
"system.enableEagerWorkflowStart=true",
]
"--dynamic-config-value",
"nexusoperation.enableStandalone=true",
"--dynamic-config-value",
"history.enableChasmCallbacks=true",
],
)
elif env_type == "time-skipping":
env = await WorkflowEnvironment.start_time_skipping()
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import uuid
from datetime import timedelta

import pytest
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler
from nexus_standalone_operations.service import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
MyNexusService,
)
from nexus_standalone_operations.worker import TASK_QUEUE
from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint


async def test_nexus_standalone_operations(client: Client, env: WorkflowEnvironment):
if env.supports_time_skipping:
pytest.skip("Time Skipping server does not support standalone nexus operations")

endpoint_name = f"test-nexus-standalone-{uuid.uuid4()}"

create_response = await create_nexus_endpoint(
name=endpoint_name,
task_queue=TASK_QUEUE,
client=client,
)
try:
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloWorkflow],
nexus_service_handlers=[MyNexusServiceHandler()],
):
nexus_client = client.create_nexus_client(
service=MyNexusService, endpoint=endpoint_name
)

# Test sync echo operation
echo_result = None
echo_result = await nexus_client.execute_operation(
MyNexusService.echo,
EchoInput(message="test-echo"),
id=str(uuid.uuid4()),
schedule_to_close_timeout=timedelta(seconds=10),
)
assert isinstance(echo_result, EchoOutput)
assert echo_result.message == "test-echo"

# Test async hello operation
hello_result = await nexus_client.execute_operation(
MyNexusService.hello,
HelloInput(name="Test"),
id=str(uuid.uuid4()),
schedule_to_close_timeout=timedelta(seconds=10),
)
assert isinstance(hello_result, HelloOutput)
assert hello_result.greeting == "Hello, Test!"

# Test count operations
count = await client.count_nexus_operations(f'Endpoint = "{endpoint_name}"')
assert count.count >= 0
finally:
_ = await delete_nexus_endpoint(
id=create_response.endpoint.id,
version=create_response.endpoint.version,
client=client,
)
Loading