ria-toolkit-oss/src/ria_toolkit_oss/server/routers/orchestrator.py

113 lines
4.0 KiB
Python
Raw Normal View History

2026-03-11 10:27:18 -04:00
"""Orchestrator routes: campaign deployment, status, and cancellation."""
from __future__ import annotations
import threading
import time
import uuid
from typing import Any
from fastapi import APIRouter, HTTPException, status
from ria_toolkit_oss.orchestration.campaign import CampaignConfig
from ria_toolkit_oss.orchestration.executor import CampaignExecutor
from ..models import (
CampaignStatusResponse,
CancelResponse,
DeployRequest,
DeployResponse,
)
from ..state import (
CampaignCancelledError,
CampaignState,
get_campaign,
set_campaign,
update_campaign,
)
router = APIRouter()
def _make_progress_cb(campaign_id: str, cancel_event: threading.Event):
def cb(step_index: int, total_steps: int, step_result: Any) -> None:
update_campaign(campaign_id, progress=step_index)
if cancel_event.is_set():
raise CampaignCancelledError(f"Cancelled at step {step_index}/{total_steps}")
return cb
def _run_campaign_thread(campaign_id: str, cfg: CampaignConfig) -> None:
state = get_campaign(campaign_id)
try:
result = CampaignExecutor(
config=cfg,
progress_cb=_make_progress_cb(campaign_id, state.cancel_event),
).run()
update_campaign(
campaign_id, status="completed", progress=cfg.total_steps(), result=result.to_dict(), ended_at=time.time()
)
except CampaignCancelledError:
update_campaign(campaign_id, status="cancelled", ended_at=time.time())
except Exception as e:
update_campaign(campaign_id, status="failed", error=str(e), ended_at=time.time())
@router.post("/deploy", response_model=DeployResponse)
async def deploy(request: DeployRequest):
"""Deploy a campaign config and start execution. Returns a ``campaign_id`` for polling.
Cancellation takes effect at step boundaries, not mid-capture.
2026-03-31 13:51:10 -04:00
External scripts are not permitted in server-deployed campaigns. Configure
transmitters without the ``script`` field, or run campaigns via the CLI.
2026-03-11 10:27:18 -04:00
"""
try:
cfg = CampaignConfig.from_dict(request.config)
except (ValueError, KeyError) as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e))
2026-04-01 11:57:59 -04:00
if cfg.transmitters and any(t.script for t in cfg.transmitters):
2026-03-31 13:51:10 -04:00
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="External scripts are not permitted in server-deployed campaigns. "
"Remove the 'script' field from all transmitters, or run the campaign via the CLI.",
)
2026-03-11 10:27:18 -04:00
campaign_id = str(uuid.uuid4())
cancel_event = threading.Event()
thread = threading.Thread(target=_run_campaign_thread, args=(campaign_id, cfg), daemon=True)
set_campaign(
CampaignState(
campaign_id=campaign_id,
status="running",
config_name=cfg.name,
cancel_event=cancel_event,
thread=thread,
total_steps=cfg.total_steps(),
)
)
thread.start()
return DeployResponse(campaign_id=campaign_id)
@router.get("/status/{campaign_id}", response_model=CampaignStatusResponse)
async def get_status(campaign_id: str):
"""Get the status and progress of a deployed campaign."""
state = get_campaign(campaign_id)
if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Campaign {campaign_id!r} not found")
return CampaignStatusResponse(**state.to_dict())
@router.post("/cancel/{campaign_id}", response_model=CancelResponse)
async def cancel(campaign_id: str):
"""Request cancellation. Takes effect at the next step boundary."""
state = get_campaign(campaign_id)
if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Campaign {campaign_id!r} not found")
if state.status != "running":
return CancelResponse(campaign_id=campaign_id, cancelled=False)
state.cancel_event.set()
return CancelResponse(campaign_id=campaign_id, cancelled=True)