Skip to content

Commit e4fd242

Browse files
committed
feat(copilot): add local PC executor shim binding
Platform-side implementation for routing copilot execution to the user's local machine via the autogpt-local-executor shim. - ShimConnectionManager: in-memory WebSocket registry with wait_for() - LocalPCShim: duck-type drop-in for E2B AsyncSandbox - .commands.run(), .files.read(), .files.write(), .pause(), .kill() - WebSocket endpoint: /ws/local-executor/{session_id}?token=<access_token> - Validates token via introspect_token() before accepting connection - HELLO/HELLO_ACK handshake - Wire into _setup_e2b(): LocalPC branch runs before E2B branch - Pause guard: skip pause_sandbox_direct for LocalPCShim instances - Config fields: use_local_pc_executor, allow_computer_use, local_pc_executor_ws_path Requires: autogpt-local-executor shim running on user's machine
1 parent 3cad995 commit e4fd242

6 files changed

Lines changed: 298 additions & 1 deletion

File tree

autogpt_platform/backend/backend/api/features/local_executor/__init__.py

Whitespace-only changes.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""
2+
WebSocket endpoint for the autogpt-local-executor shim.
3+
4+
The shim dials in with:
5+
ws://<host>/ws/local-executor/<session_id>?token=<access_token>
6+
7+
Auth: the token is validated via introspect_token() before the connection
8+
is accepted. On success the WebSocket is registered in ShimConnectionManager
9+
so LocalPCShim.for_session() can find it.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import logging
15+
16+
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
17+
18+
from backend.copilot.tools.local_pc_shim import get_shim_manager
19+
from backend.data.auth.oauth import introspect_token
20+
21+
logger = logging.getLogger(__name__)
22+
23+
router = APIRouter()
24+
25+
26+
@router.websocket("/ws/local-executor/{session_id}")
27+
async def local_executor_ws(session_id: str, websocket: WebSocket) -> None:
28+
token = websocket.query_params.get("token", "")
29+
if not token:
30+
await websocket.close(code=4401, reason="Missing token")
31+
return
32+
33+
try:
34+
token_info = await introspect_token(token, token_type_hint="access_token")
35+
if not token_info or not token_info.get("active"):
36+
await websocket.close(code=4401, reason="Invalid or expired token")
37+
return
38+
except Exception:
39+
logger.exception("[LocalPC] Token introspection failed for session %s", session_id[:12])
40+
await websocket.close(code=4500, reason="Auth error")
41+
return
42+
43+
await websocket.accept()
44+
45+
# Handshake: expect HELLO, send HELLO_ACK
46+
try:
47+
import json, time, uuid
48+
raw = await websocket.receive_text()
49+
hello = json.loads(raw)
50+
if hello.get("type") != "HELLO":
51+
await websocket.close(code=4400, reason="Expected HELLO")
52+
return
53+
54+
granted = hello.get("payload", {}).get("capabilities", [])
55+
ack = {
56+
"type": "HELLO_ACK",
57+
"id": str(uuid.uuid4()),
58+
"ts": time.time(),
59+
"payload": {
60+
"session_id": session_id,
61+
"granted_capabilities": granted,
62+
"server_version": "0.0.1",
63+
},
64+
}
65+
await websocket.send_text(json.dumps(ack))
66+
except Exception:
67+
logger.exception("[LocalPC] Handshake failed for session %s", session_id[:12])
68+
await websocket.close(code=4500, reason="Handshake error")
69+
return
70+
71+
manager = get_shim_manager()
72+
manager.register(session_id, websocket)
73+
logger.info("[LocalPC] Shim connected for session %s", session_id[:12])
74+
75+
try:
76+
# Keep connection alive; LocalPCShim._recv_loop drives the traffic
77+
while True:
78+
await websocket.receive_text()
79+
except (WebSocketDisconnect, Exception):
80+
pass
81+
finally:
82+
manager.unregister(session_id)
83+
logger.info("[LocalPC] Shim disconnected for session %s", session_id[:12])

autogpt_platform/backend/backend/api/rest_api.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import backend.api.features.oauth
3434
import backend.api.features.otto.routes
3535
import backend.api.features.platform_linking.routes
36+
import backend.api.features.local_executor.routes as local_executor_routes
3637
import backend.api.features.postmark.postmark
3738
import backend.api.features.push.routes as push_routes
3839
import backend.api.features.store.model
@@ -405,6 +406,10 @@ async def validation_error_handler(
405406
prefix="/api/platform-linking",
406407
)
407408

409+
app.include_router(
410+
local_executor_routes.router,
411+
tags=["experimental", "local-executor"],
412+
)
408413
app.mount("/external-api", external_api)
409414

410415

autogpt_platform/backend/backend/copilot/config.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,23 @@ class ChatConfig(BaseSettings):
393393
description="E2B lifecycle action on timeout: 'pause' (default, free) or 'kill'.",
394394
)
395395

396+
# --- Local PC executor ---
397+
use_local_pc_executor: bool = Field(
398+
default=False,
399+
description=(
400+
"Route copilot execution to the user's local machine via the "
401+
"autogpt-local-executor shim. When True, LocalPCShim is used instead of E2B."
402+
),
403+
)
404+
allow_computer_use: bool = Field(
405+
default=False,
406+
description="Allow Claude computer-use beta tools when the local PC shim is active.",
407+
)
408+
local_pc_executor_ws_path: str = Field(
409+
default="/ws/local-executor",
410+
description="WebSocket path prefix for the local executor shim endpoint.",
411+
)
412+
396413
@property
397414
def openrouter_active(self) -> bool:
398415
"""True when OpenRouter config is shape-valid (flag + credentials).

autogpt_platform/backend/backend/copilot/sdk/service.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
from ..token_tracking import persist_and_record_usage
121121
from ..tools import ToolGroup
122122
from ..tools.e2b_sandbox import get_or_create_sandbox, pause_sandbox_direct
123+
from ..tools.local_pc_shim import LocalPCShim, get_shim_manager
123124
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
124125
from ..tracking import track_user_message
125126
from ..transcript import (
@@ -3812,6 +3813,19 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
38123813

38133814
async def _setup_e2b():
38143815
"""Set up E2B sandbox if configured, return sandbox or None."""
3816+
if config.use_local_pc_executor:
3817+
try:
3818+
shim = await LocalPCShim.for_session(
3819+
session_id, manager=get_shim_manager(), connect_timeout=30.0
3820+
)
3821+
return shim
3822+
except Exception as shim_err:
3823+
logger.error(
3824+
"[LocalPC] [%s] Shim connection failed: %s",
3825+
session_id[:12],
3826+
shim_err,
3827+
)
3828+
return None
38153829
if not (e2b_api_key := config.active_e2b_api_key):
38163830
if config.use_e2b_sandbox:
38173831
logger.warning(
@@ -4919,7 +4933,7 @@ def _on_stderr(line: str) -> None:
49194933
# _background_tasks to prevent garbage collection.
49204934
# Use pause_sandbox_direct to skip the Redis lookup and reconnect
49214935
# round-trip — e2b_sandbox is the live object from this turn.
4922-
if e2b_sandbox is not None:
4936+
if e2b_sandbox is not None and not isinstance(e2b_sandbox, LocalPCShim):
49234937
task = asyncio.create_task(pause_sandbox_direct(e2b_sandbox, session_id))
49244938
_background_tasks.add(task)
49254939
task.add_done_callback(_background_tasks.discard)
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
"""
2+
Platform-side binding for the autogpt-local-executor shim.
3+
"""
4+
5+
from __future__ import annotations
6+
7+
import asyncio
8+
import json
9+
import logging
10+
import time
11+
import uuid
12+
from typing import Any
13+
14+
from fastapi import WebSocket
15+
16+
logger = logging.getLogger(__name__)
17+
18+
_shim_manager: "ShimConnectionManager | None" = None
19+
20+
21+
def get_shim_manager() -> "ShimConnectionManager":
22+
global _shim_manager
23+
if _shim_manager is None:
24+
_shim_manager = ShimConnectionManager()
25+
return _shim_manager
26+
27+
28+
class ShimConnectionManager:
29+
def __init__(self) -> None:
30+
self._connections: dict[str, WebSocket] = {}
31+
self._waiters: dict[str, list[asyncio.Future[WebSocket]]] = {}
32+
33+
def register(self, session_id: str, ws: WebSocket) -> None:
34+
self._connections[session_id] = ws
35+
for fut in self._waiters.pop(session_id, []):
36+
if not fut.done():
37+
fut.set_result(ws)
38+
logger.info("[LocalPC] Shim registered for session %s", session_id[:12])
39+
40+
def unregister(self, session_id: str) -> None:
41+
self._connections.pop(session_id, None)
42+
logger.info("[LocalPC] Shim unregistered for session %s", session_id[:12])
43+
44+
async def wait_for(self, session_id: str, timeout: float = 30.0) -> WebSocket:
45+
if session_id in self._connections:
46+
return self._connections[session_id]
47+
loop = asyncio.get_event_loop()
48+
fut: asyncio.Future[WebSocket] = loop.create_future()
49+
self._waiters.setdefault(session_id, []).append(fut)
50+
try:
51+
return await asyncio.wait_for(asyncio.shield(fut), timeout=timeout)
52+
except asyncio.TimeoutError:
53+
raise TimeoutError(
54+
f"[LocalPC] Shim for session {session_id[:12]} did not connect within {timeout}s"
55+
)
56+
57+
def get(self, session_id: str) -> WebSocket | None:
58+
return self._connections.get(session_id)
59+
60+
61+
class _FilesProxy:
62+
def __init__(self, shim: "LocalPCShim") -> None:
63+
self._shim = shim
64+
65+
async def read(self, path: str, *, encoding: str = "utf-8") -> str:
66+
resp = await self._shim._rpc("FILE_READ", {"path": path, "encoding": encoding})
67+
if resp.get("type") == "ERROR":
68+
raise OSError(resp["payload"].get("message", "FILE_READ failed"))
69+
return resp["payload"]["content"]
70+
71+
async def write(self, path: str, content: str, *, encoding: str = "utf-8") -> None:
72+
resp = await self._shim._rpc(
73+
"FILE_WRITE",
74+
{"path": path, "content": content, "encoding": encoding, "create_parents": True},
75+
)
76+
if resp.get("type") == "ERROR":
77+
raise OSError(resp["payload"].get("message", "FILE_WRITE failed"))
78+
79+
80+
class _CommandsProxy:
81+
def __init__(self, shim: "LocalPCShim") -> None:
82+
self._shim = shim
83+
84+
async def run(
85+
self,
86+
command: str,
87+
*,
88+
cwd: str | None = None,
89+
timeout: int | None = None,
90+
envs: dict[str, str] | None = None,
91+
) -> Any:
92+
payload: dict[str, Any] = {"command": command}
93+
if cwd:
94+
payload["cwd"] = cwd
95+
if timeout:
96+
payload["timeout_seconds"] = timeout
97+
if envs:
98+
payload["env"] = envs
99+
resp = await self._shim._rpc("EXECUTE_COMMAND", payload)
100+
if resp.get("type") == "ERROR":
101+
raise RuntimeError(resp["payload"].get("message", "EXECUTE_COMMAND failed"))
102+
return _CommandResult(resp["payload"])
103+
104+
105+
class _CommandResult:
106+
def __init__(self, payload: dict) -> None:
107+
self.stdout = payload.get("stdout", "")
108+
self.stderr = payload.get("stderr", "")
109+
self.exit_code = payload.get("exit_code", -1)
110+
self.timed_out = payload.get("timed_out", False)
111+
112+
113+
class LocalPCShim:
114+
"""
115+
Drop-in replacement for E2B AsyncSandbox that routes execution to the
116+
user's local machine via the autogpt-local-executor shim.
117+
118+
Duck-type contract: .commands.run(), .files.read(), .files.write(),
119+
.pause(), .kill(), .sandbox_id
120+
"""
121+
122+
def __init__(self, session_id: str, ws: WebSocket) -> None:
123+
self.sandbox_id = session_id
124+
self._ws = ws
125+
self._pending: dict[str, asyncio.Future[dict]] = {}
126+
self.files = _FilesProxy(self)
127+
self.commands = _CommandsProxy(self)
128+
self._recv_task = asyncio.create_task(self._recv_loop())
129+
130+
@classmethod
131+
async def for_session(
132+
cls,
133+
session_id: str,
134+
*,
135+
manager: ShimConnectionManager,
136+
connect_timeout: float = 30.0,
137+
) -> "LocalPCShim":
138+
ws = await manager.wait_for(session_id, timeout=connect_timeout)
139+
return cls(session_id, ws)
140+
141+
async def _rpc(self, msg_type: str, payload: dict, *, timeout: float = 30.0) -> dict:
142+
msg_id = str(uuid.uuid4())
143+
msg = {"type": msg_type, "id": msg_id, "ts": time.time(), "payload": payload}
144+
loop = asyncio.get_event_loop()
145+
fut: asyncio.Future[dict] = loop.create_future()
146+
self._pending[msg_id] = fut
147+
try:
148+
await self._ws.send_text(json.dumps(msg))
149+
return await asyncio.wait_for(asyncio.shield(fut), timeout=timeout)
150+
except asyncio.TimeoutError:
151+
self._pending.pop(msg_id, None)
152+
raise TimeoutError(f"[LocalPC] RPC {msg_type} timed out after {timeout}s")
153+
154+
async def _recv_loop(self) -> None:
155+
try:
156+
async for raw in self._ws.iter_text():
157+
try:
158+
msg = json.loads(raw)
159+
msg_id = msg.get("id")
160+
if msg_id and msg_id in self._pending:
161+
fut = self._pending.pop(msg_id)
162+
if not fut.done():
163+
fut.set_result(msg)
164+
except Exception:
165+
logger.exception("[LocalPC] Error processing shim message")
166+
except Exception:
167+
logger.debug("[LocalPC] Shim recv loop ended for %s", self.sandbox_id[:12])
168+
169+
async def pause(self) -> None:
170+
pass # no billing on local machine
171+
172+
async def kill(self) -> None:
173+
try:
174+
await self._ws.close()
175+
except Exception:
176+
pass
177+
if not self._recv_task.done():
178+
self._recv_task.cancel()

0 commit comments

Comments
 (0)