refactor(orchestrator): Audit and cleanup native_orchestrator.py
This commit is contained in:
@@ -2,19 +2,23 @@ from pathlib import Path
|
||||
from typing import Optional
|
||||
import json
|
||||
import re
|
||||
from src import paths
|
||||
|
||||
def read_plan(track_id: str, base_dir: str = ".") -> str:
|
||||
plan_path = Path(base_dir) / "conductor" / "tracks" / track_id / "plan.md"
|
||||
"""Reads the implementation plan (plan.md) for a track."""
|
||||
plan_path = paths.get_track_state_dir(track_id, base_dir) / "plan.md"
|
||||
if not plan_path.exists():
|
||||
return ""
|
||||
return plan_path.read_text(encoding="utf-8")
|
||||
|
||||
def write_plan(track_id: str, content: str, base_dir: str = ".") -> None:
|
||||
plan_path = Path(base_dir) / "conductor" / "tracks" / track_id / "plan.md"
|
||||
"""Writes the implementation plan (plan.md) for a track."""
|
||||
plan_path = paths.get_track_state_dir(track_id, base_dir) / "plan.md"
|
||||
plan_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
plan_path.write_text(content, encoding="utf-8")
|
||||
|
||||
def parse_plan_tasks(content: str) -> list[dict[str, str]]:
|
||||
"""Parses the tasks from a plan.md file."""
|
||||
tasks = []
|
||||
for line in content.split("\n"):
|
||||
stripped = line.strip()
|
||||
@@ -25,21 +29,25 @@ def parse_plan_tasks(content: str) -> list[dict[str, str]]:
|
||||
return tasks
|
||||
|
||||
def read_metadata(track_id: str, base_dir: str = ".") -> dict:
|
||||
meta_path = Path(base_dir) / "conductor" / "tracks" / track_id / "metadata.json"
|
||||
"""Reads the metadata (metadata.json) for a track."""
|
||||
meta_path = paths.get_track_state_dir(track_id, base_dir) / "metadata.json"
|
||||
if not meta_path.exists():
|
||||
return {}
|
||||
return json.loads(meta_path.read_text(encoding="utf-8"))
|
||||
|
||||
def write_metadata(track_id: str, data: dict, base_dir: str = ".") -> None:
|
||||
meta_path = Path(base_dir) / "conductor" / "tracks" / track_id / "metadata.json"
|
||||
"""Writes the metadata (metadata.json) for a track."""
|
||||
meta_path = paths.get_track_state_dir(track_id, base_dir) / "metadata.json"
|
||||
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
meta_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||||
|
||||
def get_track_dir(track_id: str, base_dir: str = ".") -> Path:
|
||||
return Path(base_dir) / "conductor" / "tracks" / track_id
|
||||
"""Returns the state directory for a specific track."""
|
||||
return paths.get_track_state_dir(track_id, base_dir)
|
||||
|
||||
def get_archive_dir(base_dir: str = ".") -> Path:
|
||||
return Path(base_dir) / "conductor" / "archive"
|
||||
"""Returns the central archive directory for completed tracks."""
|
||||
return paths.get_archive_dir(base_dir)
|
||||
|
||||
class NativeOrchestrator:
|
||||
def __init__(self, base_dir: str = "."):
|
||||
|
||||
Reference in New Issue
Block a user