Private
Public Access
0
0

some organization pass, still need to review a bunch

This commit is contained in:
2026-06-06 00:21:36 -04:00
parent f8b0a1243d
commit 053f5d867a
18 changed files with 658 additions and 706 deletions
+1 -1
View File
@@ -4170,7 +4170,7 @@ def render_operations_hub(app: App) -> None:
with imscope.tab_item("Vendor State") as (exp, _):
if exp: render_vendor_state(app)
def render_vendor_state(app: App) -> None:
def render_vendor_state(app: App) -> None: # TODO(Ed): Shouldn't this just be a part of usage analytics? We can show all used vendors at once...
"""Render the Operations Hub > Vendor State panel.
[C: src/vendor_state.py:get_vendor_state]
"""
+1 -1
View File
@@ -121,4 +121,4 @@ class PresetManager:
raise ValueError(f"Cannot save to {path}: Parent directory {path.parent} is a file.")
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
f.write(tomli_w.dumps(data).encode("utf-8"))
f.write(tomli_w.dumps(data).encode("utf-8"))
+6 -8
View File
@@ -35,9 +35,8 @@ def parse_ts(s: str) -> Optional[datetime.datetime]:
def entry_to_str(entry: dict[str, Any]) -> str:
"""
Serialise a disc entry dict -> stored string.
[C: tests/test_thinking_persistence.py:test_entry_to_str_with_thinking]
Serialise a disc entry dict -> stored string.
[C: tests/test_thinking_persistence.py:test_entry_to_str_with_thinking]
"""
ts = entry.get("ts", "")
role = entry.get("role", "User")
@@ -56,15 +55,14 @@ def entry_to_str(entry: dict[str, Any]) -> str:
def format_discussion(entries: list[dict[str, Any]]) -> str:
"""
Convert a list of discussion entry dicts into a single formatted string.
Convert a list of discussion entry dicts into a single formatted string.
"""
return "\n\n".join([entry_to_str(e) for e in entries])
def str_to_entry(raw: str, roles: list[str]) -> dict[str, Any]:
"""
Parse a stored string back to a disc entry dict.
[C: tests/test_thinking_persistence.py:test_str_to_entry_with_thinking]
Parse a stored string back to a disc entry dict.
[C: tests/test_thinking_persistence.py:test_str_to_entry_with_thinking]
"""
ts = ""
rest = raw
@@ -492,4 +490,4 @@ def promote_take(project_dict: dict, take_id: str, new_id: str) -> None:
# If the take was active, update the active pointer
if project_dict["discussion"].get("active") == take_id:
project_dict["discussion"]["active"] = new_id
project_dict["discussion"]["active"] = new_id
+235 -237
View File
@@ -6,288 +6,286 @@ import sys
from typing import List, Dict, Any, Optional
from src import ai_client
from src import models
from src import mcp_client
from src.file_cache import ASTParser
_SENTENCE_TRANSFORMERS = None
_GOOGLE_GENAI = None
_CHROMADB = None
_GOOGLE_GENAI = None
_CHROMADB = None
def _get_sentence_transformers():
global _SENTENCE_TRANSFORMERS
if _SENTENCE_TRANSFORMERS is None:
try:
from sentence_transformers import SentenceTransformer
_SENTENCE_TRANSFORMERS = SentenceTransformer
except Exception as e:
sys.stderr.write(f"FAILED to import sentence_transformers: {e}\n")
sys.stderr.flush()
raise e
return _SENTENCE_TRANSFORMERS
global _SENTENCE_TRANSFORMERS
if _SENTENCE_TRANSFORMERS is None:
try:
from sentence_transformers import SentenceTransformer
_SENTENCE_TRANSFORMERS = SentenceTransformer
except Exception as e:
sys.stderr.write(f"FAILED to import sentence_transformers: {e}\n")
sys.stderr.flush()
raise e
return _SENTENCE_TRANSFORMERS
def _get_google_genai():
global _GOOGLE_GENAI
if _GOOGLE_GENAI is None:
from google import genai
from google.genai import types
_GOOGLE_GENAI = (genai, types)
return _GOOGLE_GENAI
global _GOOGLE_GENAI
if _GOOGLE_GENAI is None:
from google import genai
from google.genai import types
_GOOGLE_GENAI = (genai, types)
return _GOOGLE_GENAI
def _get_chromadb():
global _CHROMADB
if _CHROMADB is None:
import chromadb
from chromadb.config import Settings
_CHROMADB = (chromadb, Settings)
return _CHROMADB
global _CHROMADB
if _CHROMADB is None:
import chromadb
from chromadb.config import Settings
_CHROMADB = (chromadb, Settings)
return _CHROMADB
class BaseEmbeddingProvider:
def embed(self, texts: List[str]) -> List[List[float]]:
raise NotImplementedError()
def embed(self, texts: List[str]) -> List[List[float]]:
raise NotImplementedError()
class LocalEmbeddingProvider(BaseEmbeddingProvider):
def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
self.model = None
try:
ST = _get_sentence_transformers()
if ST:
self.model = ST(model_name)
except Exception as e:
sys.stderr.write(f"LocalEmbeddingProvider failed to load model {model_name}: {e}. Using dummy embeddings.\n")
sys.stderr.flush()
def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
self.model = None
try:
ST = _get_sentence_transformers()
if ST:
self.model = ST(model_name)
except Exception as e:
sys.stderr.write(f"LocalEmbeddingProvider failed to load model {model_name}: {e}. Using dummy embeddings.\n")
sys.stderr.flush()
def embed(self, texts: List[str]) -> List[List[float]]:
if self.model:
embeddings = self.model.encode(texts)
return embeddings.tolist()
else:
# Dummy embeddings (384 dims for all-MiniLM-L6-v2)
return [[0.0] * 384 for _ in texts]
def embed(self, texts: List[str]) -> List[List[float]]:
if self.model:
embeddings = self.model.encode(texts)
return embeddings.tolist()
else:
# Dummy embeddings (384 dims for all-MiniLM-L6-v2)
return [[0.0] * 384 for _ in texts]
class GeminiEmbeddingProvider(BaseEmbeddingProvider):
def __init__(self, model_name: str = 'gemini-embedding-001'):
self.model_name = model_name
def __init__(self, model_name: str = 'gemini-embedding-001'):
self.model_name = model_name
def embed(self, texts: List[str]) -> List[List[float]]:
google_module = _get_google_genai()
if google_module is None:
raise ImportError("google-genai is not installed")
genai_pkg, types = google_module
from src import ai_client
ai_client._ensure_gemini_client()
client = ai_client._gemini_client
if not client:
raise ValueError("Gemini client not initialized")
res = client.models.embed_content(
model=self.model_name,
contents=texts,
config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT")
)
return [e.values for e in res.embeddings]
def embed(self, texts: List[str]) -> List[List[float]]:
google_module = _get_google_genai()
if google_module is None:
raise ImportError("google-genai is not installed")
genai_pkg, types = google_module
ai_client._ensure_gemini_client()
client = ai_client._gemini_client
if not client:
raise ValueError("Gemini client not initialized")
res = client.models.embed_content(
model = self.model_name,
contents = texts,
config = types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT")
)
return [e.values for e in res.embeddings]
class RAGEngine:
def __init__(self, config: models.RAGConfig, base_dir: str = "."):
self.config = copy.deepcopy(config)
self.base_dir = base_dir
self.client = None
self.collection = None
self.embedding_provider = None
def __init__(self, config: models.RAGConfig, base_dir: str = "."):
self.config = copy.deepcopy(config)
self.base_dir = base_dir
self.client = None
self.collection = None
self.embedding_provider = None
if not self.config.enabled:
return
if not self.config.enabled: return
self._init_embedding_provider()
self._init_vector_store()
self._init_embedding_provider()
self._init_vector_store()
def _init_embedding_provider(self):
if self.config.embedding_provider == 'gemini':
self.embedding_provider = GeminiEmbeddingProvider()
elif self.config.embedding_provider == 'local':
self.embedding_provider = LocalEmbeddingProvider()
else:
raise ValueError(f"Unknown embedding provider: {self.config.embedding_provider}")
def _init_embedding_provider(self):
if self.config.embedding_provider == 'gemini':
self.embedding_provider = GeminiEmbeddingProvider()
elif self.config.embedding_provider == 'local':
self.embedding_provider = LocalEmbeddingProvider()
else:
raise ValueError(f"Unknown embedding provider: {self.config.embedding_provider}")
def _init_vector_store(self):
vs_config = self.config.vector_store
if vs_config.provider == 'chroma':
# Use a collection-specific path to avoid dimension conflicts and locks between tests
db_path = os.path.abspath(os.path.join(self.base_dir, ".slop_cache", f"chroma_{vs_config.collection_name}"))
os.makedirs(db_path, exist_ok=True)
chroma_module = _get_chromadb()
if chroma_module is None:
raise ImportError("chromadb is not installed")
chromadb, Settings = chroma_module
self.client = chromadb.PersistentClient(path=db_path)
self.collection = self.client.get_or_create_collection(name=vs_config.collection_name)
elif vs_config.provider == 'mock':
self.client = "mock"
self.collection = "mock"
else:
raise ValueError(f"Unknown vector store provider: {vs_config.provider}")
def _init_vector_store(self):
vs_config = self.config.vector_store
if vs_config.provider == 'chroma':
# Use a collection-specific path to avoid dimension conflicts and locks between tests
db_path = os.path.abspath(os.path.join(self.base_dir, ".slop_cache", f"chroma_{vs_config.collection_name}"))
os.makedirs(db_path, exist_ok=True)
chroma_module = _get_chromadb()
if chroma_module is None:
raise ImportError("chromadb is not installed")
chromadb, Settings = chroma_module
self.client = chromadb.PersistentClient(path=db_path)
self.collection = self.client.get_or_create_collection(name=vs_config.collection_name)
elif vs_config.provider == 'mock':
self.client = "mock"
self.collection = "mock"
else:
raise ValueError(f"Unknown vector store provider: {vs_config.provider}")
def is_empty(self) -> bool:
if not self.config.enabled:
return True
if self.config.vector_store.provider == 'mock' or self.collection == "mock":
return True
if self.collection is None:
return True
return self.collection.count() == 0
def is_empty(self) -> bool:
if not self.config.enabled:
return True
if self.config.vector_store.provider == 'mock' or self.collection == "mock":
return True
if self.collection is None:
return True
return self.collection.count() == 0
def add_documents(self, ids: List[str], texts: List[str], metadatas: Optional[List[Dict[str, Any]]] = None):
"""
[C: tests/test_rag_engine.py:test_rag_engine_chroma]
"""
if not self.config.enabled or self.collection == "mock":
return
embeddings = self.embedding_provider.embed(texts)
self.collection.upsert(
ids=ids,
embeddings=embeddings,
documents=texts,
metadatas=metadatas
)
def add_documents(self, ids: List[str], texts: List[str], metadatas: Optional[List[Dict[str, Any]]] = None):
"""
[C: tests/test_rag_engine.py:test_rag_engine_chroma]
"""
if not self.config.enabled or self.collection == "mock":
return
embeddings = self.embedding_provider.embed(texts)
self.collection.upsert(
ids = ids,
embeddings = embeddings,
documents = texts,
metadatas = metadatas
)
def _chunk_text(self, content: str) -> List[str]:
"""Character-based chunking with overlap."""
chunks = []
if not content:
return chunks
chunk_size = self.config.chunk_size
overlap = self.config.chunk_overlap
start = 0
while start < len(content):
end = start + chunk_size
chunks.append(content[start:end])
if end >= len(content):
break
start += (chunk_size - overlap)
return chunks
def _chunk_text(self, content: str) -> List[str]:
"""Character-based chunking with overlap."""
chunks = []
if not content:
return chunks
chunk_size = self.config.chunk_size
overlap = self.config.chunk_overlap
start = 0
while start < len(content):
end = start + chunk_size
chunks.append(content[start:end])
if end >= len(content):
break
start += (chunk_size - overlap)
return chunks
def _chunk_code(self, content: str, file_path: str) -> List[str]:
"""AST-aware chunking for Python code."""
try:
from src.file_cache import ASTParser
parser = ASTParser("python")
tree = parser.parse(content)
chunks = []
def _chunk_code(self, content: str, file_path: str) -> List[str]:
"""AST-aware chunking for Python code."""
try:
parser = ASTParser("python")
tree = parser.parse(content)
chunks = []
for node in tree.root_node.children:
if node.type in ("function_definition", "class_definition"):
chunks.append(content[node.start_byte:node.end_byte])
for node in tree.root_node.children:
if node.type in ("function_definition", "class_definition"):
chunks.append(content[node.start_byte:node.end_byte])
if not chunks or len(content) < self.config.chunk_size:
return self._chunk_text(content)
return chunks
except Exception:
return self._chunk_text(content)
if not chunks or len(content) < self.config.chunk_size:
return self._chunk_text(content)
return chunks
except Exception:
return self._chunk_text(content)
def index_file(self, file_path: str):
"""Reads, chunks, and indexes a file into the vector store."""
if not self.config.enabled or self.collection == "mock":
return
def index_file(self, file_path: str):
"""Reads, chunks, and indexes a file into the vector store."""
if not self.config.enabled or self.collection == "mock":
return
full_path = os.path.join(self.base_dir, file_path)
if not os.path.exists(full_path):
return
full_path = os.path.join(self.base_dir, file_path)
if not os.path.exists(full_path):
return
try:
mtime = os.path.getmtime(full_path)
except Exception:
return
try:
mtime = os.path.getmtime(full_path)
except Exception:
return
try:
res = self.collection.get(where={"path": file_path}, limit=1, include=["metadatas"])
if res and res["metadatas"] and res["metadatas"][0]:
if res["metadatas"][0].get("mtime") == mtime:
return
except Exception:
pass
try:
res = self.collection.get(where={"path": file_path}, limit=1, include=["metadatas"])
if res and res["metadatas"] and res["metadatas"][0]:
if res["metadatas"][0].get("mtime") == mtime:
return
except Exception:
pass
try:
with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
except Exception:
return
try:
with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
except Exception:
return
self.collection.delete(where={"path": file_path})
self.collection.delete(where={"path": file_path})
if file_path.lower().endswith(".py"):
chunks = self._chunk_code(content, file_path)
else:
chunks = self._chunk_text(content)
if file_path.lower().endswith(".py"):
chunks = self._chunk_code(content, file_path)
else:
chunks = self._chunk_text(content)
if not chunks:
return
if not chunks:
return
ids = [f"{file_path}_{i}" for i in range(len(chunks))]
metadatas = [{"path": file_path, "chunk": i, "mtime": mtime} for i in range(len(chunks))]
self.add_documents(ids, chunks, metadatas)
ids = [f"{file_path}_{i}" for i in range(len(chunks))]
metadatas = [{"path": file_path, "chunk": i, "mtime": mtime} for i in range(len(chunks))]
self.add_documents(ids, chunks, metadatas)
def _search_mcp(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
async def _async_search_mcp():
tool_name = self.config.vector_store.mcp_tool or "rag_search"
args = {"query": query, "top_k": top_k}
res_str = await mcp_client.async_dispatch(tool_name, args)
try:
data = json.loads(res_str)
if isinstance(data, list):
return data
elif isinstance(data, dict) and "results" in data:
return data["results"]
return []
except:
return []
def _search_mcp(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
async def _async_search_mcp():
tool_name = self.config.vector_store.mcp_tool or "rag_search"
args = {"query": query, "top_k": top_k}
res_str = await mcp_client.async_dispatch(tool_name, args)
try:
data = json.loads(res_str)
if isinstance(data, list):
return data
elif isinstance(data, dict) and "results" in data:
return data["results"]
return []
except:
return []
return asyncio.run(_async_search_mcp())
return asyncio.run(_async_search_mcp())
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
[C: tests/mock_concurrent_mma.py:main, tests/test_rag_engine.py:test_rag_engine_chroma]
"""
if not self.config.enabled:
return []
if self.config.vector_store.provider == 'mcp':
return self._search_mcp(query, top_k)
if self.collection == "mock":
return []
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
[C: tests/mock_concurrent_mma.py:main, tests/test_rag_engine.py:test_rag_engine_chroma]
"""
if not self.config.enabled: return []
if self.config.vector_store.provider == 'mcp': return self._search_mcp(query, top_k)
if self.collection == "mock": return []
query_embedding = self.embedding_provider.embed([query])[0]
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
query_embedding = self.embedding_provider.embed([query])[0]
results = self.collection.query(
query_embeddings = [query_embedding],
n_results = top_k
)
ret = []
if results and results["ids"] and results["ids"][0]:
for i in range(len(results["ids"][0])):
ret.append({
"id": results["ids"][0][i],
"document": results["documents"][0][i],
"metadata": results["metadatas"][0][i] if results["metadatas"] else {},
"distance": results["distances"][0][i] if "distances" in results and results["distances"] else 0.0
})
return ret
ret = []
if results and results["ids"] and results["ids"][0]:
for i in range(len(results["ids"][0])):
ret.append({
"id": results["ids"][0][i],
"document": results["documents"][0][i],
"metadata": results["metadatas"][0][i] if results["metadatas"] else {},
"distance": results["distances"][0][i] if "distances" in results and results["distances"] else 0.0
})
return ret
def delete_documents(self, ids: List[str]):
"""
[C: tests/test_rag_engine.py:test_rag_engine_chroma]
"""
if not self.config.enabled or self.collection == "mock":
return
self.collection.delete(ids=ids)
def delete_documents(self, ids: List[str]):
"""
[C: tests/test_rag_engine.py:test_rag_engine_chroma]
"""
if not self.config.enabled or self.collection == "mock":
return
self.collection.delete(ids=ids)
def get_all_indexed_paths(self) -> List[str]:
if not self.config.enabled or self.collection == "mock":
return []
res = self.collection.get(include=["metadatas"])
if not res or not res["metadatas"]:
return []
return list(set(m.get("path") for m in res["metadatas"] if m.get("path")))
def get_all_indexed_paths(self) -> List[str]:
if not self.config.enabled or self.collection == "mock":
return []
res = self.collection.get(include=["metadatas"])
if not res or not res["metadatas"]:
return []
return list(set(m.get("path") for m in res["metadatas"] if m.get("path")))
def delete_documents_by_path(self, file_paths: List[str]):
if not self.config.enabled or self.collection == "mock":
return
for path in file_paths:
self.collection.delete(where={"path": path})
def delete_documents_by_path(self, file_paths: List[str]):
if not self.config.enabled or self.collection == "mock":
return
for path in file_paths:
self.collection.delete(where={"path": path})
+34 -35
View File
@@ -27,27 +27,27 @@ from typing import Any, Optional, TextIO
from src import paths
_ts: str = "" # session timestamp string e.g. "20260301_142233"
_session_id: str = "" # YYYYMMDD_HHMMSS[_Label]
_session_dir: Optional[Path] = None # Path to the sub-directory for this session
_seq: int = 0 # monotonic counter for script files this session
_output_seq: int = 0 # monotonic counter for output files this session
_seq_lock: threading.Lock = threading.Lock()
_ts: str = "" # session timestamp string e.g. "20260301_142233"
_session_id: str = "" # YYYYMMDD_HHMMSS[_Label]
_session_dir: Optional[Path] = None # Path to the sub-directory for this session
_seq: int = 0 # monotonic counter for script files this session
_output_seq: int = 0 # monotonic counter for output files this session
_seq_lock: threading.Lock = threading.Lock()
_output_seq_lock: threading.Lock = threading.Lock()
_comms_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/comms.log
_tool_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/toolcalls.log
_api_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/apihooks.log
_cli_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/clicalls.log
_tool_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/toolcalls.log
_api_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/apihooks.log
_cli_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/clicalls.log
def _now_ts() -> str:
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def open_session(label: Optional[str] = None) -> None:
"""
Called once at GUI startup. Creates the log directories if needed and
opens the log files for this session within a sub-directory.
[C: tests/test_app_controller_offloading.py:tmp_session_dir, tests/test_logging_e2e.py:test_logging_e2e, tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs, tests/test_session_logger_optimization.py:test_session_directory_and_subdirectories_creation, tests/test_session_logger_reset.py:test_reset_session, tests/test_session_logging.py:test_open_session_creates_subdir_and_registry]
Called once at GUI startup. Creates the log directories if needed and
opens the log files for this session within a sub-directory.
[C: tests/test_app_controller_offloading.py:tmp_session_dir, tests/test_logging_e2e.py:test_logging_e2e, tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs, tests/test_session_logger_optimization.py:test_session_directory_and_subdirectories_creation, tests/test_session_logger_reset.py:test_reset_session, tests/test_session_logging.py:test_open_session_creates_subdir_and_registry]
"""
global _ts, _session_id, _session_dir, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq, _output_seq
if _comms_fh is not None:
@@ -66,12 +66,12 @@ def open_session(label: Optional[str] = None) -> None:
paths.get_scripts_dir().mkdir(parents=True, exist_ok=True)
_seq = 0
_seq = 0
_output_seq = 0
_comms_fh = open(_session_dir / "comms.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_session_dir / "toolcalls.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_session_dir / "apihooks.log", "w", encoding="utf-8", buffering=1)
_cli_fh = open(_session_dir / "clicalls.log", "w", encoding="utf-8", buffering=1)
_comms_fh = open(_session_dir / "comms.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_session_dir / "toolcalls.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_session_dir / "apihooks.log", "w", encoding="utf-8", buffering=1)
_cli_fh = open(_session_dir / "clicalls.log", "w", encoding="utf-8", buffering=1)
_tool_fh.write(f"# Tool-call log — session {_session_id}\n\n")
_tool_fh.flush()
@@ -90,9 +90,8 @@ def open_session(label: Optional[str] = None) -> None:
def close_session() -> None:
"""
Flush and close all log files. Called on clean exit.
[C: tests/test_app_controller_offloading.py:tmp_session_dir, tests/test_logging_e2e.py:e2e_setup, tests/test_logging_e2e.py:test_logging_e2e, tests/test_session_logger_optimization.py:temp_session_setup, tests/test_session_logger_reset.py:temp_logs, tests/test_session_logging.py:temp_logs]
Flush and close all log files. Called on clean exit.
[C: tests/test_app_controller_offloading.py:tmp_session_dir, tests/test_logging_e2e.py:e2e_setup, tests/test_logging_e2e.py:test_logging_e2e, tests/test_session_logger_optimization.py:temp_session_setup, tests/test_session_logger_reset.py:temp_logs, tests/test_session_logging.py:temp_logs]
"""
global _comms_fh, _tool_fh, _api_fh, _cli_fh, _session_id
if _comms_fh is None:
@@ -137,9 +136,9 @@ def log_api_hook(method: str, path: str, payload: str) -> None:
def log_comms(entry: dict[str, Any]) -> None:
"""
Append one comms entry to the comms log file as a JSON-L line.
Thread-safe (GIL + line-buffered file).
[C: tests/test_logging_e2e.py:test_logging_e2e]
Append one comms entry to the comms log file as a JSON-L line.
Thread-safe (GIL + line-buffered file).
[C: tests/test_logging_e2e.py:test_logging_e2e]
"""
if _comms_fh is None:
return
@@ -150,9 +149,9 @@ def log_comms(entry: dict[str, Any]) -> None:
def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optional[str]:
"""
Append a tool-call record to the toolcalls log and write the PS1 script to
the session's scripts directory. Returns the path of the written script file.
[C: tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts]
Append a tool-call record to the toolcalls log and write the PS1 script to
the session's scripts directory. Returns the path of the written script file.
[C: tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts]
"""
global _seq
if _tool_fh is None:
@@ -193,9 +192,9 @@ def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optio
def log_tool_output(content: str) -> Optional[str]:
"""
Save tool output content to a unique file in the session's outputs directory.
Returns the path of the written file.
[C: tests/test_session_logger_optimization.py:test_log_tool_output_returns_none_if_no_session, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs]
Save tool output content to a unique file in the session's outputs directory.
Returns the path of the written file.
[C: tests/test_session_logger_optimization.py:test_log_tool_output_returns_none_if_no_session, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs]
"""
global _output_seq
if _session_dir is None:
@@ -221,14 +220,14 @@ def log_cli_call(command: str, stdin_content: Optional[str], stdout_content: Opt
ts_entry = datetime.datetime.now().strftime("%H:%M:%S")
try:
log_data = {
"timestamp": ts_entry,
"command": command,
"stdin": stdin_content,
"stdout": stdout_content,
"stderr": stderr_content,
"timestamp": ts_entry,
"command": command,
"stdin": stdin_content,
"stdout": stdout_content,
"stderr": stderr_content,
"latency_sec": latency
}
_cli_fh.write(json.dumps(log_data, ensure_ascii=False, default=str) + "\n")
_cli_fh.flush()
except Exception:
pass
pass
+6 -9
View File
@@ -3,13 +3,12 @@ from imgui_bundle import imgui
def draw_soft_shadow(draw_list: imgui.ImDrawList, p_min: imgui.ImVec2, p_max: imgui.ImVec2, color: imgui.ImVec4, shadow_size: float = 10.0, rounding: float = 0.0) -> None:
"""
Simulates a soft shadow effect by drawing multiple concentric rounded rectangles
with decreasing alpha values. This is a faux-shader effect using primitive batching.
Simulates a soft shadow effect by drawing multiple concentric rounded rectangles
with decreasing alpha values. This is a faux-shader effect using primitive batching.
"""
r, g, b, a = color.x, color.y, color.z, color.w
steps = int(shadow_size)
if steps <= 0:
return
steps = int(shadow_size)
if steps <= 0: return
alpha_step = a / steps
@@ -17,12 +16,10 @@ def draw_soft_shadow(draw_list: imgui.ImDrawList, p_min: imgui.ImVec2, p_max: im
current_alpha = a - (i * alpha_step)
# Apply an easing function (e.g., cubic) for a smoother shadow falloff
current_alpha = current_alpha * (1.0 - (i / steps)**2)
if current_alpha <= 0.01:
continue
expand = float(i)
expand = float(i)
c_min = imgui.ImVec2(p_min.x - expand, p_min.y - expand)
c_max = imgui.ImVec2(p_max.x + expand, p_max.y + expand)
@@ -35,4 +32,4 @@ def draw_soft_shadow(draw_list: imgui.ImDrawList, p_min: imgui.ImVec2, p_max: im
rounding + expand if rounding > 0 else 0.0,
flags=imgui.ImDrawFlags_.round_corners_all if rounding > 0 else imgui.ImDrawFlags_.none,
thickness=1.0
)
)
+14 -14
View File
@@ -55,24 +55,24 @@ def _build_subprocess_env() -> dict[str, str]:
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
"""
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
If patch_callback is provided, it receives (error, file_context) and returns patch text.
[C: tests/test_tier4_interceptor.py:test_run_powershell_no_qa_callback_on_success, tests/test_tier4_interceptor.py:test_run_powershell_optional_qa_callback, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only]
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
If patch_callback is provided, it receives (error, file_context) and returns patch text.
[C: tests/test_tier4_interceptor.py:test_run_powershell_no_qa_callback_on_success, tests/test_tier4_interceptor.py:test_run_powershell_optional_qa_callback, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only]
"""
safe_dir: str = str(base_dir).replace("'", "''")
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
exe: Optional[str] = next((x for x in ["powershell.exe", "pwsh.exe", "powershell", "pwsh"] if shutil.which(x)), None)
safe_dir: str = str(base_dir).replace("'", "''")
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
exe: Optional[str] = next((x for x in ["powershell.exe", "pwsh.exe", "powershell", "pwsh"] if shutil.which(x)), None)
if not exe: return "ERROR: Neither powershell nor pwsh found in PATH"
try:
process = subprocess.Popen(
[exe, "-NoProfile", "-NonInteractive", "-Command", full_script],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
cwd=base_dir, env=_build_subprocess_env(),
stdin = subprocess.DEVNULL,
stdout = subprocess.PIPE, stderr=subprocess.PIPE, text=True,
cwd = base_dir, env=_build_subprocess_env(),
)
stdout, stderr = process.communicate(timeout=TIMEOUT_SECONDS)
parts: list[str] = []
@@ -99,4 +99,4 @@ def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[s
except Exception as e:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
return f"ERROR: {e}"
return f"ERROR: {e}"
+40 -56
View File
@@ -1,23 +1,12 @@
# summarize.py
"""
Note(Gemini):
Local heuristic summariser. Doesn't use any AI or network.
Uses Python's AST to reliably pull out classes, methods, and functions.
Regex is used for TOML and Markdown.
The rationale here is simple: giving the AI the *structure* of a codebase is 90%
as good as giving it the full source, but costs 1% of the tokens.
If it needs the full source of a file after reading the summary, it can just call read_file.
"""
# summarize.py
"""
Local symbolic summariser no AI calls, no network.
For each file, extracts structural information:
.py : imports, classes (with methods), top-level functions, global constants
.toml : top-level table keys + array lengths
.md : headings (h1-h3)
other : line count + first 8 lines as preview
.py : imports, classes (with methods), top-level functions, global constants
.toml : top-level table keys + array lengths
.md : headings (h1-h3)
other : line count + first 8 lines as preview
Returns a compact markdown string per file, suitable for use as a low-token
context block that replaces full file contents in the initial <context> send.
@@ -28,6 +17,8 @@ import re
from pathlib import Path
from typing import Callable, Any
from src import ai_client
from src.summary_cache import SummaryCache, get_file_hash
@@ -37,9 +28,9 @@ _summary_cache = SummaryCache()
# ------------------------------------------------------------------ per-type extractors
def _summarise_python(path: Path, content: str) -> str:
lines = content.splitlines()
lines = content.splitlines()
line_count = len(lines)
parts = [f"**Python** — {line_count} lines"]
parts = [f"**Python** — {line_count} lines"]
try:
tree = ast.parse(content.lstrip(chr(0xFEFF)), filename=str(path))
except SyntaxError as e:
@@ -73,31 +64,28 @@ def _summarise_python(path: Path, content: str) -> str:
n.name for n in ast.iter_child_nodes(node)
if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
]
if methods:
parts.append(f"class {node.name}: {', '.join(methods)}")
else:
parts.append(f"class {node.name}")
if methods: parts.append(f"class {node.name}: {', '.join(methods)}")
else: parts.append(f"class {node.name}")
top_fns = [
node.name for node in ast.iter_child_nodes(tree)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
]
if top_fns:
parts.append(f"functions: {', '.join(top_fns)}")
if top_fns: parts.append(f"functions: {', '.join(top_fns)}")
return "\n".join(parts)
def _summarise_toml(path: Path, content: str) -> str:
lines = content.splitlines()
lines = content.splitlines()
line_count = len(lines)
parts = [f"**TOML** — {line_count} lines"]
table_pat = re.compile(r"^\s*\[{1,2}([^\[\]]+)\]{1,2}")
tables = []
parts = [f"**TOML** — {line_count} lines"]
table_pat = re.compile(r"^\s*\[{1,2}([^\[\]]+)\]{1,2}")
tables = []
for line in lines:
m = table_pat.match(line)
if m:
tables.append(m.group(1).strip())
if tables:
parts.append(f"tables: {', '.join(tables)}")
kv_pat = re.compile(r"^([a-zA-Z_][a-zA-Z0-9_]*)\s*=")
kv_pat = re.compile(r"^([a-zA-Z_][a-zA-Z0-9_]*)\s*=")
in_table = False
top_keys = []
for line in lines:
@@ -113,15 +101,15 @@ def _summarise_toml(path: Path, content: str) -> str:
return "\n".join(parts)
def _summarise_markdown(path: Path, content: str) -> str:
lines = content.splitlines()
lines = content.splitlines()
line_count = len(lines)
parts = [f"**Markdown** — {line_count} lines"]
headings = []
parts = [f"**Markdown** — {line_count} lines"]
headings = []
for line in lines:
m = re.match(r"^(#{1,3})\s+(.+)", line)
if m:
level = len(m.group(1))
text = m.group(2).strip()
level = len(m.group(1))
text = m.group(2).strip()
indent = " " * (level - 1)
headings.append(f"{indent}{text}")
if headings:
@@ -129,10 +117,10 @@ def _summarise_markdown(path: Path, content: str) -> str:
return "\n".join(parts)
def _summarise_generic(path: Path, content: str) -> str:
lines = content.splitlines()
lines = content.splitlines()
line_count = len(lines)
suffix = path.suffix.lstrip(".").upper() or "TEXT"
parts = [f"**{suffix}** — {line_count} lines"]
suffix = path.suffix.lstrip(".").upper() or "TEXT"
parts = [f"**{suffix}** — {line_count} lines"]
# Heuristic for C-style languages
important_lines = []
@@ -168,24 +156,20 @@ _SUMMARISERS: dict[str, Callable[[Path, str], str]] = {
def summarise_file(path: Path, content: str) -> str:
"""
Return a compact markdown summary string for a single file.
`content` is the already-read file text (or an error string).
[C: tests/test_subagent_summarization.py:test_summarise_file_integration]
Return a compact markdown summary string for a single file.
`content` is the already-read file text (or an error string).
[C: tests/test_subagent_summarization.py:test_summarise_file_integration]
"""
content_hash = get_file_hash(content)
cached = _summary_cache.get_summary(str(path), content_hash)
if cached:
return cached
cached = _summary_cache.get_summary(str(path), content_hash)
if cached: return cached
suffix = path.suffix.lower() if hasattr(path, "suffix") else ""
fn = _SUMMARISERS.get(suffix, _summarise_generic)
fn = _SUMMARISERS.get(suffix, _summarise_generic)
try:
heuristic_outline = fn(path, content)
# Smart AI Summarization
is_code = suffix in [".py", ".ps1", ".js", ".ts", ".cpp", ".c", ".h", ".cs", ".go", ".rs", ".lua"]
try:
from src import ai_client
smart_summary = ai_client.run_subagent_summarization(
file_path=str(path),
content=content[:10000],
@@ -205,31 +189,31 @@ def summarise_file(path: Path, content: str) -> str:
def summarise_items(file_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Given a list of file_item dicts (as returned by aggregate.build_file_items),
return a parallel list of dicts with an added `summary` key.
Given a list of file_item dicts (as returned by aggregate.build_file_items),
return a parallel list of dicts with an added `summary` key.
"""
result = []
for item in file_items:
path = item.get("path")
path = item.get("path")
content = item.get("content", "")
error = item.get("error", False)
error = item.get("error", False)
if error or path is None:
summary = "_Error reading file_"
else:
p = Path(path) if not isinstance(path, Path) else path
p = Path(path) if not isinstance(path, Path) else path
summary = summarise_file(p, content)
result.append({**item, "summary": summary})
return result
def build_summary_markdown(file_items: list[dict[str, Any]]) -> str:
"""
Build a compact markdown string of file summaries, suitable for the
initial <context> block instead of full file contents.
Build a compact markdown string of file summaries, suitable for the
initial <context> block instead of full file contents.
"""
summarised = summarise_items(file_items)
parts = []
parts = []
for item in summarised:
path = item.get("path") or item.get("entry", "unknown")
path = item.get("path") or item.get("entry", "unknown")
summary = item.get("summary", "")
parts.append(f"### `{path}`\n\n{summary}")
return "\n\n---\n\n".join(parts)
return "\n\n---\n\n".join(parts)
+13 -20
View File
@@ -7,18 +7,15 @@ from typing import Optional, Dict
def get_file_hash(content: str) -> str:
"""
Returns SHA256 hash of the content.
[C: tests/test_summary_cache.py:test_get_file_hash, tests/test_summary_cache.py:test_summary_cache]
Returns SHA256 hash of the content.
[C: tests/test_summary_cache.py:test_get_file_hash, tests/test_summary_cache.py:test_summary_cache]
"""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
class SummaryCache:
"""
A hash-based cache for file summaries to avoid redundant processing.
Invalidates when content hash changes.
A hash-based cache for file summaries to avoid redundant processing.
Invalidates when content hash changes.
"""
def __init__(self, cache_file: Optional[str] = None, max_entries: int = 1000):
if cache_file:
@@ -32,9 +29,8 @@ class SummaryCache:
def load(self) -> None:
"""
Loads cache from disk.
[C: src/tool_presets.py:ToolPresetManager._read_raw, src/workspace_manager.py:WorkspaceManager._load_file, tests/test_gui_phase3.py:test_create_track, tests/test_history_management.py:test_save_separation, tests/test_session_logging.py:test_open_session_creates_subdir_and_registry]
Loads cache from disk.
[C: src/tool_presets.py:ToolPresetManager._read_raw, src/workspace_manager.py:WorkspaceManager._load_file, tests/test_gui_phase3.py:test_create_track, tests/test_history_management.py:test_save_separation, tests/test_session_logging.py:test_open_session_creates_subdir_and_registry]
"""
if self.cache_file.exists():
try:
@@ -54,9 +50,8 @@ class SummaryCache:
def get_summary(self, file_path: str, content_hash: str) -> Optional[str]:
"""
Returns cached summary if hash matches, otherwise None.
[C: tests/test_summary_cache.py:test_summary_cache, tests/test_summary_cache.py:test_summary_cache_lru]
Returns cached summary if hash matches, otherwise None.
[C: tests/test_summary_cache.py:test_summary_cache, tests/test_summary_cache.py:test_summary_cache_lru]
"""
entry = self.cache.get(file_path)
if entry and entry.get("hash") == content_hash:
@@ -68,9 +63,8 @@ class SummaryCache:
def set_summary(self, file_path: str, content_hash: str, summary: str) -> None:
"""
Stores summary in cache and saves to disk.
[C: tests/test_summary_cache.py:test_summary_cache, tests/test_summary_cache.py:test_summary_cache_lru]
Stores summary in cache and saves to disk.
[C: tests/test_summary_cache.py:test_summary_cache, tests/test_summary_cache.py:test_summary_cache_lru]
"""
if file_path in self.cache:
self.cache.pop(file_path)
@@ -87,9 +81,8 @@ class SummaryCache:
def clear(self) -> None:
"""
Clears the cache both in-memory and on disk.
[C: tests/conftest.py:reset_ai_client]
Clears the cache both in-memory and on disk.
[C: tests/conftest.py:reset_ai_client]
"""
self.cache.clear()
if self.cache_file.exists():
@@ -109,4 +102,4 @@ class SummaryCache:
return {
"entries": len(self.cache),
"size_bytes": size_bytes
}
}
+66 -67
View File
@@ -14,7 +14,7 @@ from contextlib import nullcontext
from imgui_bundle import imgui, hello_imgui
from typing import Any, Optional
import src.theme_nerv
from src import theme_nerv
from src import imgui_scopes as imscope
from src.theme_nerv import DATA_GREEN
@@ -60,18 +60,18 @@ def _build_semantic_colour_dict(theme: ThemeFile) -> dict[str, tuple[float, floa
_BUILTIN_PALETTES: dict[str, dict[int, tuple]] = {
"ImGui Dark": {},
"NERV": {},
"NERV": {},
}
_TOML_PALETTES: dict[str, ThemeFile] = {}
_TOML_COLOUR_CACHE: dict[str, dict[int, tuple[float, float, float, float]]] = {}
_TOML_PALETTES: dict[str, ThemeFile] = {}
_TOML_COLOUR_CACHE: dict[str, dict[int, tuple[float, float, float, float]]] = {}
_TOML_SEMANTIC_CACHE: dict[str, dict[str, tuple[float, float, float, float]]] = {}
_current_palette: str = "10x Dark"
_current_font_path: str = "fonts/Inter-Regular.ttf"
_current_font_size: float = 16.0
_current_scale: float = 1.0
_transparency: float = 1.0
_current_palette: str = "10x Dark"
_current_font_path: str = "fonts/Inter-Regular.ttf"
_current_font_size: float = 16.0
_current_scale: float = 1.0
_transparency: float = 1.0
_child_transparency: float = 1.0
# Per-palette tone mapping: { "Palette Name": value }
@@ -106,8 +106,8 @@ def _tone_map(rgb: tuple[float, float, float, float], palette: str) -> tuple[flo
r = max(0, r)**(1.0/g); g_val = max(0, g_val)**(1.0/g); bl = max(0, bl)**(1.0/g)
return (max(0.0, min(1.0, r)), max(0.0, min(1.0, g_val)), max(0.0, min(1.0, bl)), a)
_crt_filter = CRTFilter()
_alert_pulsing = AlertPulsing()
_crt_filter = CRTFilter()
_alert_pulsing = AlertPulsing()
_status_flicker = StatusFlicker()
# ------------------------------------------------------------------ public API
@@ -160,7 +160,7 @@ def get_color(name: str, alpha: float = 1.0) -> imgui.ImVec4:
if palette_name in _TOML_SEMANTIC_CACHE:
d = _TOML_SEMANTIC_CACHE[palette_name]
if name in d:
rgba = list(d[name])
rgba = list(d[name])
rgba[3] = alpha
return imgui.ImVec4(*_tone_map(tuple(rgba), palette_name))
@@ -172,25 +172,25 @@ def get_color(name: str, alpha: float = 1.0) -> imgui.ImVec4:
# 2. Hardcoded fallbacks if not in TOML (matches ThemePalette defaults)
fallbacks = {
"text": (200, 200, 200),
"text_disabled": (130, 130, 130),
"status_success": (80, 255, 80),
"status_warning": (255, 152, 48),
"status_error": (255, 72, 64),
"status_info": (0, 255, 255),
"bubble_user": (30, 45, 75),
"bubble_ai": (35, 65, 45),
"bubble_vendor": (65, 55, 30),
"bubble_system": (25, 25, 25),
"slice_manual": (255, 165, 0),
"slice_auto": (0, 255, 0),
"slice_selection": (100, 100, 255),
"diff_added": (51, 230, 51),
"diff_removed": (230, 51, 51),
"diff_header": (77, 178, 255),
"text": (200, 200, 200),
"text_disabled": (130, 130, 130),
"status_success": (80, 255, 80),
"status_warning": (255, 152, 48),
"status_error": (255, 72, 64),
"status_info": (0, 255, 255),
"bubble_user": (30, 45, 75),
"bubble_ai": (35, 65, 45),
"bubble_vendor": (65, 55, 30),
"bubble_system": (25, 25, 25),
"slice_manual": (255, 165, 0),
"slice_auto": (0, 255, 0),
"slice_selection": (100, 100, 255),
"diff_added": (51, 230, 51),
"diff_removed": (230, 51, 51),
"diff_header": (77, 178, 255),
"table_header_text": (255, 255, 255),
"table_row_bg": (0, 0, 0),
"table_row_bg_alt": (10, 10, 10),
"table_row_bg": (0, 0, 0),
"table_row_bg_alt": (10, 10, 10),
}
if name in fallbacks:
rgb = fallbacks[name]
@@ -202,8 +202,8 @@ def get_color(name: str, alpha: float = 1.0) -> imgui.ImVec4:
def get_role_tint(role: str) -> imgui.ImVec4:
"""Returns a subtle background tint color based on the message role."""
mapping = {
"User": "bubble_user",
"AI": "bubble_ai",
"User": "bubble_user",
"AI": "bubble_ai",
"Vendor API": "bubble_vendor",
}
return get_color(mapping.get(role, "bubble_system"), alpha=0.6)
@@ -213,7 +213,6 @@ def apply(palette_name: str) -> None:
global _current_palette
_current_palette = palette_name
if palette_name == 'NERV':
from src import theme_nerv
theme_nerv.apply_nerv()
apply_syntax_palette(get_syntax_palette_for_theme(palette_name))
return
@@ -228,7 +227,7 @@ def apply(palette_name: str) -> None:
elif palette_name in _TOML_PALETTES:
colours = _TOML_COLOUR_CACHE.get(palette_name, {})
if not colours:
theme = _TOML_PALETTES[palette_name]
theme = _TOML_PALETTES[palette_name]
colours = _build_imgui_colour_dict(theme)
_TOML_COLOUR_CACHE[palette_name] = colours
imgui.style_colors_dark()
@@ -243,33 +242,33 @@ def apply(palette_name: str) -> None:
# 2. Professional tweaks
style = imgui.get_style()
style.window_rounding = 6.0
style.child_rounding = 4.0
style.frame_rounding = 4.0
style.popup_rounding = 4.0
style.window_rounding = 6.0
style.child_rounding = 4.0
style.frame_rounding = 4.0
style.popup_rounding = 4.0
style.scrollbar_rounding = 12.0
style.grab_rounding = 4.0
style.tab_rounding = 4.0
style.grab_rounding = 4.0
style.tab_rounding = 4.0
style.window_border_size = 1.0
style.frame_border_size = 1.0
style.popup_border_size = 1.0
style.frame_border_size = 1.0
style.popup_border_size = 1.0
win_bg = style.color_(imgui.Col_.window_bg)
win_bg = style.color_(imgui.Col_.window_bg)
win_bg.w = _transparency
style.set_color_(imgui.Col_.window_bg, win_bg)
for col_idx in [imgui.Col_.child_bg, imgui.Col_.frame_bg, imgui.Col_.popup_bg]:
c = style.color_(col_idx)
c = style.color_(col_idx)
c.w = _child_transparency
style.set_color_(col_idx, c)
style.window_padding = imgui.ImVec2(8.0, 8.0)
style.frame_padding = imgui.ImVec2(8.0, 4.0)
style.item_spacing = imgui.ImVec2(8.0, 4.0)
style.item_inner_spacing = imgui.ImVec2(4.0, 4.0)
style.scrollbar_size = 14.0
style.anti_aliased_lines = True
style.anti_aliased_fill = True
style.window_padding = imgui.ImVec2(8.0, 8.0)
style.frame_padding = imgui.ImVec2(8.0, 4.0)
style.item_spacing = imgui.ImVec2(8.0, 4.0)
style.item_inner_spacing = imgui.ImVec2(4.0, 4.0)
style.scrollbar_size = 14.0
style.anti_aliased_lines = True
style.anti_aliased_fill = True
style.anti_aliased_lines_use_tex = True
# 3. Sync syntax palette and clear markdown render cache
@@ -299,18 +298,18 @@ def get_palette_names() -> list[str]:
def save_to_config(config: dict) -> None:
"""Persist theme settings into the config dict."""
config.setdefault("theme", {})
config["theme"]["palette"] = _current_palette
config["theme"]["font_path"] = _current_font_path
config["theme"]["font_size"] = _current_font_size
config["theme"]["scale"] = _current_scale
config["theme"]["transparency"] = _transparency
config["theme"]["palette"] = _current_palette
config["theme"]["font_path"] = _current_font_path
config["theme"]["font_size"] = _current_font_size
config["theme"]["scale"] = _current_scale
config["theme"]["transparency"] = _transparency
config["theme"]["child_transparency"] = _child_transparency
tm = {}
for p in set(list(_brightness.keys()) + list(_contrast.keys()) + list(_gamma.keys())):
tm[p] = {
"brightness": _brightness.get(p, 1.0),
"contrast": _contrast.get(p, 1.0),
"gamma": _gamma.get(p, 1.0)
"contrast": _contrast.get(p, 1.0),
"gamma": _gamma.get(p, 1.0)
}
config["theme"]["tone_mapping"] = tm
@@ -322,15 +321,15 @@ def load_from_config(config: dict) -> None:
_current_palette = t.get("palette", "10x Dark")
if _current_palette in ("", "DPG Default"):
_current_palette = "10x Dark"
_current_font_path = t.get("font_path", "fonts/Inter-Regular.ttf")
_current_font_size = float(t.get("font_size", 16.0))
_current_scale = float(t.get("scale", 1.0))
_transparency = float(t.get("transparency", 1.0))
_current_font_path = t.get("font_path", "fonts/Inter-Regular.ttf")
_current_font_size = float(t.get("font_size", 16.0))
_current_scale = float(t.get("scale", 1.0))
_transparency = float(t.get("transparency", 1.0))
_child_transparency = float(t.get("child_transparency", 1.0))
tm = t.get("tone_mapping", {})
tm = t.get("tone_mapping", {})
_brightness = {p: float(v.get("brightness", 1.0)) for p, v in tm.items()}
_contrast = {p: float(v.get("contrast", 1.0)) for p, v in tm.items()}
_gamma = {p: float(v.get("gamma", 1.0)) for p, v in tm.items()}
_contrast = {p: float(v.get("contrast", 1.0)) for p, v in tm.items()}
_gamma = {p: float(v.get("gamma", 1.0)) for p, v in tm.items()}
# ------------------------------------------------------------------ external themes
@@ -341,8 +340,8 @@ def load_themes_from_disk() -> None:
loaded: dict[str, ThemeFile] = {}
if themes_dir.exists() and themes_dir.is_dir():
loaded.update(load_themes_from_dir(themes_dir, scope="global"))
_TOML_PALETTES = loaded
_TOML_COLOUR_CACHE = {name: _build_imgui_colour_dict(t) for name, t in loaded.items()}
_TOML_PALETTES = loaded
_TOML_COLOUR_CACHE = {name: _build_imgui_colour_dict(t) for name, t in loaded.items()}
_TOML_SEMANTIC_CACHE = {name: _build_semantic_colour_dict(t) for name, t in loaded.items()}
def get_syntax_palette_for_theme(theme_name: str) -> str:
+97 -97
View File
@@ -13,86 +13,86 @@ VALID_SYNTAX_PALETTES: tuple[str, ...] = ("dark", "light", "mariana", "retro_blu
@dataclass
class ThemePalette:
window_bg: tuple[int, int, int] = (0, 0, 0)
child_bg: tuple[int, int, int] = (0, 0, 0)
popup_bg: tuple[int, int, int] = (0, 0, 0)
border: tuple[int, int, int] = (60, 60, 60)
border_shadow: tuple[int, int, int] = (0, 0, 0)
frame_bg: tuple[int, int, int] = (45, 45, 45)
frame_bg_hovered: tuple[int, int, int] = (60, 60, 60)
frame_bg_active: tuple[int, int, int] = (75, 75, 75)
title_bg: tuple[int, int, int] = (40, 40, 40)
title_bg_active: tuple[int, int, int] = (60, 45, 15)
title_bg_collapsed: tuple[int, int, int] = (30, 30, 30)
menu_bar_bg: tuple[int, int, int] = (35, 35, 35)
scrollbar_bg: tuple[int, int, int] = (30, 30, 30)
scrollbar_grab: tuple[int, int, int] = (80, 80, 80)
scrollbar_grab_hovered: tuple[int, int, int] = (100, 100, 100)
scrollbar_grab_active: tuple[int, int, int] = (120, 120, 120)
check_mark: tuple[int, int, int] = (200, 200, 200)
slider_grab: tuple[int, int, int] = (60, 60, 60)
slider_grab_active: tuple[int, int, int] = (100, 100, 100)
button: tuple[int, int, int] = (60, 60, 60)
button_hovered: tuple[int, int, int] = (100, 100, 100)
button_active: tuple[int, int, int] = (120, 120, 120)
header: tuple[int, int, int] = (60, 60, 60)
header_hovered: tuple[int, int, int] = (100, 100, 100)
header_active: tuple[int, int, int] = (120, 120, 120)
separator: tuple[int, int, int] = (60, 60, 60)
separator_hovered: tuple[int, int, int] = (100, 100, 100)
separator_active: tuple[int, int, int] = (200, 200, 200)
resize_grip: tuple[int, int, int] = (60, 60, 60)
resize_grip_hovered: tuple[int, int, int] = (100, 100, 100)
resize_grip_active: tuple[int, int, int] = (200, 200, 200)
tab: tuple[int, int, int] = (60, 60, 60)
tab_hovered: tuple[int, int, int] = (100, 100, 100)
tab_selected: tuple[int, int, int] = (100, 100, 100)
tab_dimmed: tuple[int, int, int] = (60, 60, 60)
tab_dimmed_selected: tuple[int, int, int] = (100, 100, 100)
docking_preview: tuple[int, int, int] = (100, 100, 100)
docking_empty_bg: tuple[int, int, int] = (20, 20, 20)
text: tuple[int, int, int] = (200, 200, 200)
text_disabled: tuple[int, int, int] = (130, 130, 130)
text_selected_bg: tuple[int, int, int] = (60, 100, 150)
table_header_bg: tuple[int, int, int] = (55, 55, 55)
table_border_strong: tuple[int, int, int] = (60, 60, 60)
table_border_light: tuple[int, int, int] = (40, 40, 40)
table_row_bg: tuple[int, int, int] = (0, 0, 0)
table_row_bg_alt: tuple[int, int, int] = (10, 10, 10)
nav_cursor: tuple[int, int, int] = (100, 100, 100)
nav_windowing_dim_bg: tuple[int, int, int] = (20, 20, 20)
nav_windowing_highlight: tuple[int, int, int] = (200, 200, 200)
modal_window_dim_bg: tuple[int, int, int] = (10, 10, 10)
plot_lines: tuple[int, int, int] = (100, 100, 100)
plot_lines_hovered: tuple[int, int, int] = (200, 100, 100)
plot_histogram: tuple[int, int, int] = (100, 100, 100)
plot_histogram_hovered: tuple[int, int, int] = (200, 100, 100)
drag_drop_target: tuple[int, int, int] = (200, 200, 0)
drag_drop_target_bg: tuple[int, int, int] = (0, 0, 0)
input_text_cursor: tuple[int, int, int] = (200, 200, 200)
window_bg: tuple[int, int, int] = (0, 0, 0)
child_bg: tuple[int, int, int] = (0, 0, 0)
popup_bg: tuple[int, int, int] = (0, 0, 0)
border: tuple[int, int, int] = (60, 60, 60)
border_shadow: tuple[int, int, int] = (0, 0, 0)
frame_bg: tuple[int, int, int] = (45, 45, 45)
frame_bg_hovered: tuple[int, int, int] = (60, 60, 60)
frame_bg_active: tuple[int, int, int] = (75, 75, 75)
title_bg: tuple[int, int, int] = (40, 40, 40)
title_bg_active: tuple[int, int, int] = (60, 45, 15)
title_bg_collapsed: tuple[int, int, int] = (30, 30, 30)
menu_bar_bg: tuple[int, int, int] = (35, 35, 35)
scrollbar_bg: tuple[int, int, int] = (30, 30, 30)
scrollbar_grab: tuple[int, int, int] = (80, 80, 80)
scrollbar_grab_hovered: tuple[int, int, int] = (100, 100, 100)
scrollbar_grab_active: tuple[int, int, int] = (120, 120, 120)
check_mark: tuple[int, int, int] = (200, 200, 200)
slider_grab: tuple[int, int, int] = (60, 60, 60)
slider_grab_active: tuple[int, int, int] = (100, 100, 100)
button: tuple[int, int, int] = (60, 60, 60)
button_hovered: tuple[int, int, int] = (100, 100, 100)
button_active: tuple[int, int, int] = (120, 120, 120)
header: tuple[int, int, int] = (60, 60, 60)
header_hovered: tuple[int, int, int] = (100, 100, 100)
header_active: tuple[int, int, int] = (120, 120, 120)
separator: tuple[int, int, int] = (60, 60, 60)
separator_hovered: tuple[int, int, int] = (100, 100, 100)
separator_active: tuple[int, int, int] = (200, 200, 200)
resize_grip: tuple[int, int, int] = (60, 60, 60)
resize_grip_hovered: tuple[int, int, int] = (100, 100, 100)
resize_grip_active: tuple[int, int, int] = (200, 200, 200)
tab: tuple[int, int, int] = (60, 60, 60)
tab_hovered: tuple[int, int, int] = (100, 100, 100)
tab_selected: tuple[int, int, int] = (100, 100, 100)
tab_dimmed: tuple[int, int, int] = (60, 60, 60)
tab_dimmed_selected: tuple[int, int, int] = (100, 100, 100)
docking_preview: tuple[int, int, int] = (100, 100, 100)
docking_empty_bg: tuple[int, int, int] = (20, 20, 20)
text: tuple[int, int, int] = (200, 200, 200)
text_disabled: tuple[int, int, int] = (130, 130, 130)
text_selected_bg: tuple[int, int, int] = (60, 100, 150)
table_header_bg: tuple[int, int, int] = (55, 55, 55)
table_border_strong: tuple[int, int, int] = (60, 60, 60)
table_border_light: tuple[int, int, int] = (40, 40, 40)
table_row_bg: tuple[int, int, int] = (0, 0, 0)
table_row_bg_alt: tuple[int, int, int] = (10, 10, 10)
nav_cursor: tuple[int, int, int] = (100, 100, 100)
nav_windowing_dim_bg: tuple[int, int, int] = (20, 20, 20)
nav_windowing_highlight: tuple[int, int, int] = (200, 200, 200)
modal_window_dim_bg: tuple[int, int, int] = (10, 10, 10)
plot_lines: tuple[int, int, int] = (100, 100, 100)
plot_lines_hovered: tuple[int, int, int] = (200, 100, 100)
plot_histogram: tuple[int, int, int] = (100, 100, 100)
plot_histogram_hovered: tuple[int, int, int] = (200, 100, 100)
drag_drop_target: tuple[int, int, int] = (200, 200, 0)
drag_drop_target_bg: tuple[int, int, int] = (0, 0, 0)
input_text_cursor: tuple[int, int, int] = (200, 200, 200)
tab_dimmed_selected_overline: tuple[int, int, int] = (100, 100, 100)
tab_selected_overline: tuple[int, int, int] = (100, 100, 100)
text_link: tuple[int, int, int] = (60, 100, 150)
tree_lines: tuple[int, int, int] = (60, 60, 60)
unsaved_marker: tuple[int, int, int] = (200, 200, 200)
tab_selected_overline: tuple[int, int, int] = (100, 100, 100)
text_link: tuple[int, int, int] = (60, 100, 150)
tree_lines: tuple[int, int, int] = (60, 60, 60)
unsaved_marker: tuple[int, int, int] = (200, 200, 200)
# Semantic colors
status_success: tuple[int, int, int] = (80, 255, 80)
status_warning: tuple[int, int, int] = (255, 152, 48)
status_error: tuple[int, int, int] = (255, 72, 64)
status_info: tuple[int, int, int] = (0, 255, 255)
bubble_user: tuple[int, int, int] = (30, 45, 75)
bubble_ai: tuple[int, int, int] = (35, 65, 45)
bubble_vendor: tuple[int, int, int] = (65, 55, 30)
bubble_system: tuple[int, int, int] = (25, 25, 25)
slice_manual: tuple[int, int, int] = (255, 165, 0)
slice_auto: tuple[int, int, int] = (0, 255, 0)
status_success: tuple[int, int, int] = (80, 255, 80)
status_warning: tuple[int, int, int] = (255, 152, 48)
status_error: tuple[int, int, int] = (255, 72, 64)
status_info: tuple[int, int, int] = (0, 255, 255)
bubble_user: tuple[int, int, int] = (30, 45, 75)
bubble_ai: tuple[int, int, int] = (35, 65, 45)
bubble_vendor: tuple[int, int, int] = (65, 55, 30)
bubble_system: tuple[int, int, int] = (25, 25, 25)
slice_manual: tuple[int, int, int] = (255, 165, 0)
slice_auto: tuple[int, int, int] = (0, 255, 0)
slice_selection: tuple[int, int, int] = (100, 100, 255)
# Diff colors
diff_added: tuple[int, int, int] = (51, 230, 51)
diff_added: tuple[int, int, int] = (51, 230, 51)
diff_removed: tuple[int, int, int] = (230, 51, 51)
diff_header: tuple[int, int, int] = (77, 178, 255)
diff_header: tuple[int, int, int] = (77, 178, 255)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ThemePalette:
@@ -108,12 +108,12 @@ class ThemePalette:
@dataclass
class ThemeFile:
name: str
palette: ThemePalette
name: str
palette: ThemePalette
syntax_palette: str
source_path: Path
scope: str
description: str = ""
source_path: Path
scope: str
description: str = ""
def __post_init__(self) -> None:
if self.syntax_palette not in VALID_SYNTAX_PALETTES:
@@ -124,12 +124,12 @@ class ThemeFile:
def with_scope(self, scope: str) -> ThemeFile:
return ThemeFile(
name=self.name,
palette=self.palette,
syntax_palette=self.syntax_palette,
source_path=self.source_path,
scope=scope,
description=self.description,
name = self.name,
palette = self.palette,
syntax_palette = self.syntax_palette,
source_path = self.source_path,
scope = scope,
description = self.description,
)
def to_dict(self) -> dict[str, Any]:
@@ -152,12 +152,12 @@ class ThemeFile:
f"must be one of {VALID_SYNTAX_PALETTES}"
)
return cls(
name=name,
palette=ThemePalette.from_dict(data["colors"]),
syntax_palette=syntax_palette,
source_path=source_path,
scope=scope,
description=str(data.get("description", "")),
name = name,
palette = ThemePalette.from_dict(data["colors"]),
syntax_palette = syntax_palette,
source_path = source_path,
scope = scope,
description = str(data.get("description", "")),
)
@@ -171,7 +171,7 @@ def load_theme_file(path: Path, scope: str) -> ThemeFile:
raise ValueError(f"failed to parse theme TOML {path}: {e}") from e
if not isinstance(data, dict):
raise ValueError(f"theme TOML {path} must be a top-level table")
name = data.get("name", path.stem)
name = data.get("name", path.stem)
theme = ThemeFile.from_dict(name, data, source_path=path, scope=scope)
return theme
@@ -204,14 +204,14 @@ def load_themes_from_toml(path: Path, scope: str) -> dict[str, ThemeFile]:
except Exception as e:
print(f"warning: failed to parse {path}: {e}", file=sys.stderr)
return out
if not isinstance(data, dict):
return out
if not isinstance(data, dict): return out
themes_sec = data.get("themes", {})
if not isinstance(themes_sec, dict):
return out
if not isinstance(themes_sec, dict): return out
for name, theme_data in themes_sec.items():
if not isinstance(theme_data, dict):
continue
if not isinstance(theme_data, dict): continue
try:
theme = ThemeFile.from_dict(name, theme_data, source_path=path, scope=scope)
except ValueError as e:
+63 -64
View File
@@ -6,84 +6,83 @@ def _c(r: int, g: int, b: int, a: int = 255) -> tuple[float, float, float, float
return (r / 255.0, g / 255.0, b / 255.0, a / 255.0)
NERV_ORANGE = _c(255, 152, 48)
DATA_GREEN = _c(80, 255, 80)
WIRE_CYAN = _c(32, 240, 255)
ALERT_RED = _c(255, 72, 64)
STEEL = _c(224, 224, 216)
BLACK = _c(0, 0, 0)
DATA_GREEN = _c(80, 255, 80)
WIRE_CYAN = _c(32, 240, 255)
ALERT_RED = _c(255, 72, 64)
STEEL = _c(224, 224, 216)
BLACK = _c(0, 0, 0)
NERV_PALETTE = {
imgui.Col_.text: STEEL,
imgui.Col_.window_bg: BLACK,
imgui.Col_.child_bg: BLACK,
imgui.Col_.popup_bg: BLACK,
imgui.Col_.border: NERV_ORANGE,
imgui.Col_.border_shadow: _c(0, 0, 0, 0),
imgui.Col_.frame_bg: BLACK,
imgui.Col_.frame_bg_hovered: _c(255, 152, 48, 40),
imgui.Col_.frame_bg_active: _c(255, 152, 48, 80),
imgui.Col_.title_bg: BLACK,
imgui.Col_.title_bg_active: BLACK,
imgui.Col_.title_bg_collapsed: BLACK,
imgui.Col_.menu_bar_bg: BLACK,
imgui.Col_.scrollbar_bg: BLACK,
imgui.Col_.scrollbar_grab: NERV_ORANGE,
imgui.Col_.scrollbar_grab_hovered: STEEL,
imgui.Col_.scrollbar_grab_active: WIRE_CYAN,
imgui.Col_.check_mark: DATA_GREEN,
imgui.Col_.slider_grab: WIRE_CYAN,
imgui.Col_.slider_grab_active: DATA_GREEN,
imgui.Col_.button: BLACK,
imgui.Col_.button_hovered: _c(255, 152, 48, 80),
imgui.Col_.button_active: _c(255, 152, 48, 120),
imgui.Col_.header: _c(255, 152, 48, 60),
imgui.Col_.header_hovered: _c(255, 152, 48, 100),
imgui.Col_.header_active: _c(255, 152, 48, 140),
imgui.Col_.separator: STEEL,
imgui.Col_.separator_hovered: WIRE_CYAN,
imgui.Col_.separator_active: DATA_GREEN,
imgui.Col_.resize_grip: NERV_ORANGE,
imgui.Col_.resize_grip_hovered: STEEL,
imgui.Col_.resize_grip_active: WIRE_CYAN,
imgui.Col_.tab: BLACK,
imgui.Col_.tab_hovered: _c(255, 152, 48, 100),
imgui.Col_.tab_selected: _c(255, 152, 48, 120),
imgui.Col_.tab_dimmed: BLACK,
imgui.Col_.tab_dimmed_selected: _c(255, 152, 48, 80),
imgui.Col_.plot_lines: WIRE_CYAN,
imgui.Col_.plot_lines_hovered: DATA_GREEN,
imgui.Col_.plot_histogram: DATA_GREEN,
imgui.Col_.plot_histogram_hovered: WIRE_CYAN,
imgui.Col_.text_selected_bg: _c(255, 152, 48, 100),
imgui.Col_.drag_drop_target: DATA_GREEN,
imgui.Col_.nav_cursor: NERV_ORANGE,
imgui.Col_.text: STEEL,
imgui.Col_.window_bg: BLACK,
imgui.Col_.child_bg: BLACK,
imgui.Col_.popup_bg: BLACK,
imgui.Col_.border: NERV_ORANGE,
imgui.Col_.border_shadow: _c(0, 0, 0, 0),
imgui.Col_.frame_bg: BLACK,
imgui.Col_.frame_bg_hovered: _c(255, 152, 48, 40),
imgui.Col_.frame_bg_active: _c(255, 152, 48, 80),
imgui.Col_.title_bg: BLACK,
imgui.Col_.title_bg_active: BLACK,
imgui.Col_.title_bg_collapsed: BLACK,
imgui.Col_.menu_bar_bg: BLACK,
imgui.Col_.scrollbar_bg: BLACK,
imgui.Col_.scrollbar_grab: NERV_ORANGE,
imgui.Col_.scrollbar_grab_hovered: STEEL,
imgui.Col_.scrollbar_grab_active: WIRE_CYAN,
imgui.Col_.check_mark: DATA_GREEN,
imgui.Col_.slider_grab: WIRE_CYAN,
imgui.Col_.slider_grab_active: DATA_GREEN,
imgui.Col_.button: BLACK,
imgui.Col_.button_hovered: _c(255, 152, 48, 80),
imgui.Col_.button_active: _c(255, 152, 48, 120),
imgui.Col_.header: _c(255, 152, 48, 60),
imgui.Col_.header_hovered: _c(255, 152, 48, 100),
imgui.Col_.header_active: _c(255, 152, 48, 140),
imgui.Col_.separator: STEEL,
imgui.Col_.separator_hovered: WIRE_CYAN,
imgui.Col_.separator_active: DATA_GREEN,
imgui.Col_.resize_grip: NERV_ORANGE,
imgui.Col_.resize_grip_hovered: STEEL,
imgui.Col_.resize_grip_active: WIRE_CYAN,
imgui.Col_.tab: BLACK,
imgui.Col_.tab_hovered: _c(255, 152, 48, 100),
imgui.Col_.tab_selected: _c(255, 152, 48, 120),
imgui.Col_.tab_dimmed: BLACK,
imgui.Col_.tab_dimmed_selected: _c(255, 152, 48, 80),
imgui.Col_.plot_lines: WIRE_CYAN,
imgui.Col_.plot_lines_hovered: DATA_GREEN,
imgui.Col_.plot_histogram: DATA_GREEN,
imgui.Col_.plot_histogram_hovered: WIRE_CYAN,
imgui.Col_.text_selected_bg: _c(255, 152, 48, 100),
imgui.Col_.drag_drop_target: DATA_GREEN,
imgui.Col_.nav_cursor: NERV_ORANGE,
imgui.Col_.nav_windowing_highlight: DATA_GREEN,
imgui.Col_.nav_windowing_dim_bg: _c(0, 0, 0, 150),
imgui.Col_.modal_window_dim_bg: _c(0, 0, 0, 150),
imgui.Col_.nav_windowing_dim_bg: _c(0, 0, 0, 150),
imgui.Col_.modal_window_dim_bg: _c(0, 0, 0, 150),
}
def apply_nerv() -> None:
"""
Apply NERV theme with hard edges and specific palette.
[C: tests/test_theme_nerv.py:test_apply_nerv_sets_rounding_and_colors]
Apply NERV theme with hard edges and specific palette.
[C: tests/test_theme_nerv.py:test_apply_nerv_sets_rounding_and_colors]
"""
style = imgui.get_style()
for col_enum, rgba in NERV_PALETTE.items():
style.set_color_(col_enum, imgui.ImVec4(*rgba))
# Hard Edges
style.window_rounding = 0.0
style.child_rounding = 0.0
style.frame_rounding = 0.0
style.popup_rounding = 0.0
style.window_rounding = 0.0
style.child_rounding = 0.0
style.frame_rounding = 0.0
style.popup_rounding = 0.0
style.scrollbar_rounding = 0.0
style.grab_rounding = 0.0
style.tab_rounding = 0.0
style.grab_rounding = 0.0
style.tab_rounding = 0.0
# Border sizes
style.window_border_size = 1.0
style.frame_border_size = 1.0
style.popup_border_size = 1.0
style.child_border_size = 1.0
style.tab_border_size = 1.0
style.frame_border_size = 1.0
style.popup_border_size = 1.0
style.child_border_size = 1.0
style.tab_border_size = 1.0
+13 -15
View File
@@ -11,20 +11,19 @@ class CRTFilter:
def render(self, width: float, height: float):
"""
[C: tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
[C: tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
"""
if not self.enabled:
return
if not self.enabled: return
draw_list = imgui.get_foreground_draw_list()
# 1. Enhanced Scanlines (Horizontal)
# Vary thickness and alpha for a more "analog" feel
for y in range(0, int(height), 2):
# Thicker/Darker every 4 pixels
is_major = (y % 4 == 0)
alpha = 0.08 if is_major else 0.04
thickness = 1.2 if is_major else 0.8
s_color = imgui.get_color_u32((0.0, 0.0, 0.0, alpha))
is_major = (y % 4 == 0)
alpha = 0.08 if is_major else 0.04
thickness = 1.2 if is_major else 0.8
s_color = imgui.get_color_u32((0.0, 0.0, 0.0, alpha))
draw_list.add_line((0.0, float(y)), (float(width), float(y)), s_color, thickness)
# 2. Shadow Mask (Vertical)
@@ -38,12 +37,11 @@ class CRTFilter:
v_steps = 20
for i in range(v_steps):
# Exponential alpha for smoother falloff
alpha = (i / v_steps) ** 3.0 * 0.25
alpha = (i / v_steps) ** 3.0 * 0.25
v_color = imgui.get_color_u32((0.0, 0.0, 0.0, alpha))
# Inset and rounding grow to simulate tube curvature
inset = (v_steps - i) * 4.5
rounding = 60.0 + (v_steps - i) * 8.0
inset = (v_steps - i) * 4.5
rounding = 60.0 + (v_steps - i) * 8.0
thickness = 15.0
if width > inset * 2.0 and height > inset * 2.0:
@@ -70,7 +68,7 @@ class StatusFlicker:
def get_alpha(self) -> float:
# Modulate between 0.7 and 1.0 using sin wave
"""
[C: tests/test_theme_nerv_fx.py:TestThemeNervFx.test_status_flicker_get_alpha]
[C: tests/test_theme_nerv_fx.py:TestThemeNervFx.test_status_flicker_get_alpha]
"""
return 0.85 + 0.15 * math.sin(time.time() * 20.0)
@@ -80,13 +78,13 @@ class AlertPulsing:
def update(self, status: str):
"""
[C: tests/test_spawn_interception_v2.py:MockDialog.wait, tests/test_theme_nerv_alert.py:test_alert_pulsing_update, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_update]
[C: tests/test_spawn_interception_v2.py:MockDialog.wait, tests/test_theme_nerv_alert.py:test_alert_pulsing_update, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_update]
"""
self.active = status.lower().startswith("error")
def render(self, width: float, height: float):
"""
[C: tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
[C: tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
"""
if not self.active:
return
@@ -96,4 +94,4 @@ class AlertPulsing:
# multiply by (0.2 - 0.05) = 0.15 and add 0.05
alpha = 0.05 + 0.15 * ((math.sin(time.time() * 4.0) + 1.0) / 2.0)
color = imgui.get_color_u32((1.0, 0.0, 0.0, alpha))
draw_list.add_rect((0.0, 0.0), (width, height), color, 0.0, 0, 10.0)
draw_list.add_rect((0.0, 0.0), (width, height), color, 0.0, 0, 10.0)
+7 -12
View File
@@ -7,18 +7,15 @@ from src.models import ThinkingSegment
def parse_thinking_trace(text: str) -> Tuple[List[ThinkingSegment], str]:
"""
Parses thinking segments from text and returns (segments, response_content).
Support extraction of thinking traces from <thinking>...</thinking>, <thought>...</thought>,
and blocks prefixed with Thinking:.
[C: tests/test_thinking_trace.py:test_parse_empty_response, tests/test_thinking_trace.py:test_parse_multiple_markers, tests/test_thinking_trace.py:test_parse_no_thinking, tests/test_thinking_trace.py:test_parse_text_thinking_prefix, tests/test_thinking_trace.py:test_parse_thinking_with_empty_response, tests/test_thinking_trace.py:test_parse_xml_thinking_tag, tests/test_thinking_trace.py:test_parse_xml_thought_tag]
Parses thinking segments from text and returns (segments, response_content).
Support extraction of thinking traces from <thinking>...</thinking>, <thought>...</thought>,
and blocks prefixed with Thinking:.
[C: tests/test_thinking_trace.py:test_parse_empty_response, tests/test_thinking_trace.py:test_parse_multiple_markers, tests/test_thinking_trace.py:test_parse_no_thinking, tests/test_thinking_trace.py:test_parse_text_thinking_prefix, tests/test_thinking_trace.py:test_parse_thinking_with_empty_response, tests/test_thinking_trace.py:test_parse_xml_thinking_tag, tests/test_thinking_trace.py:test_parse_xml_thought_tag]
"""
segments = []
# 1. Extract <thinking> and <thought> tags
current_text = text
# Combined pattern for tags
tag_pattern = re.compile(r'<(thinking|thought)>(.*?)</\1>', re.DOTALL | re.IGNORECASE)
@@ -26,7 +23,7 @@ def parse_thinking_trace(text: str) -> Tuple[List[ThinkingSegment], str]:
found_segments = []
def replace_func(match):
marker = match.group(1).lower()
marker = match.group(1).lower()
content = match.group(2).strip()
found_segments.append(ThinkingSegment(content=content, marker=marker))
return ""
@@ -46,8 +43,7 @@ def parse_thinking_trace(text: str) -> Tuple[List[ThinkingSegment], str]:
def replace_func(match):
content = match.group(1).strip()
if content:
found_segments.append(ThinkingSegment(content=content, marker="Thinking:"))
if content: found_segments.append(ThinkingSegment(content=content, marker="Thinking:"))
return "\n\n"
res = thinking_colon_pattern.sub(replace_func, txt)
@@ -55,5 +51,4 @@ def parse_thinking_trace(text: str) -> Tuple[List[ThinkingSegment], str]:
colon_segments, final_remaining = extract_colon_blocks(remaining)
segments.extend(colon_segments)
return segments, final_remaining.strip()
return segments, final_remaining.strip()
+9 -15
View File
@@ -6,7 +6,7 @@ from src.models import Tool, ToolPreset, BiasProfile
class ToolBiasEngine:
def apply_semantic_nudges(self, tool_definitions: List[Dict[str, Any]], preset: ToolPreset) -> List[Dict[str, Any]]:
"""
[C: tests/test_tool_bias.py:test_apply_semantic_nudges, tests/test_tool_bias.py:test_parameter_bias_nudging]
[C: tests/test_tool_bias.py:test_apply_semantic_nudges, tests/test_tool_bias.py:test_parameter_bias_nudging]
"""
weight_map = {
5: "[HIGH PRIORITY] ",
@@ -42,7 +42,7 @@ class ToolBiasEngine:
def generate_tooling_strategy(self, preset: ToolPreset, global_bias: BiasProfile) -> str:
"""
[C: tests/test_tool_bias.py:test_generate_tooling_strategy]
[C: tests/test_tool_bias.py:test_generate_tooling_strategy]
"""
lines = ["### Tooling Strategy"]
@@ -51,23 +51,17 @@ class ToolBiasEngine:
for cat_tools in preset.categories.values():
for t in cat_tools:
if not isinstance(t, Tool): continue
if t.weight >= 5:
preferred.append(f"{t.name} [HIGH PRIORITY]")
elif t.weight == 4:
preferred.append(f"{t.name} [PREFERRED]")
elif t.weight == 2:
low_priority.append(f"{t.name} [NOT RECOMMENDED]")
elif t.weight <= 1:
low_priority.append(f"{t.name} [LOW PRIORITY]")
if t.weight >= 5: preferred.append(f"{t.name} [HIGH PRIORITY]")
elif t.weight == 4: preferred.append(f"{t.name} [PREFERRED]")
elif t.weight == 2: low_priority.append(f"{t.name} [NOT RECOMMENDED]")
elif t.weight <= 1: low_priority.append(f"{t.name} [LOW PRIORITY]")
if preferred:
lines.append(f"Preferred tools: {', '.join(preferred)}.")
if low_priority:
lines.append(f"Low-priority tools: {', '.join(low_priority)}.")
if preferred: lines.append(f"Preferred tools: {', '.join(preferred)}.")
if low_priority: lines.append(f"Low-priority tools: {', '.join(low_priority)}.")
if global_bias.category_multipliers:
lines.append("Category focus multipliers:")
for cat, mult in global_bias.category_multipliers.items():
lines.append(f"- {cat}: {mult}x")
return "\n\n".join(lines)
return "\n\n".join(lines)
+11 -12
View File
@@ -14,7 +14,7 @@ class ToolPresetManager:
def _get_path(self, scope: str) -> Path:
"""
[C: src/workspace_manager.py:WorkspaceManager.delete_profile, src/workspace_manager.py:WorkspaceManager.save_profile]
[C: src/workspace_manager.py:WorkspaceManager.delete_profile, src/workspace_manager.py:WorkspaceManager.save_profile]
"""
if scope == "global":
return paths.get_global_tool_presets_path()
@@ -41,7 +41,7 @@ class ToolPresetManager:
def load_all_presets(self) -> Dict[str, ToolPreset]:
"""
[C: tests/test_tool_preset_manager.py:test_load_all_presets_merged]
[C: tests/test_tool_preset_manager.py:test_load_all_presets_merged]
"""
global_path = paths.get_global_tool_presets_path()
global_data = self._read_raw(global_path).get("presets", {})
@@ -62,15 +62,14 @@ class ToolPresetManager:
def load_all(self) -> Dict[str, ToolPreset]:
"""
Backward compatibility for load_all().
[C: tests/test_persona_manager.py:test_delete_persona, tests/test_persona_manager.py:test_load_all_merged, tests/test_persona_manager.py:test_save_persona, tests/test_preset_manager.py:test_delete_preset, tests/test_preset_manager.py:test_load_all_merged, tests/test_preset_manager.py:test_save_preset_global, tests/test_preset_manager.py:test_save_preset_project, tests/test_presets.py:TestPresetManager.test_delete_preset, tests/test_presets.py:TestPresetManager.test_project_overwrites_global, tests/test_presets.py:TestPresetManager.test_save_and_load_global, tests/test_presets.py:TestPresetManager.test_save_and_load_project]
Backward compatibility for load_all().
[C: tests/test_persona_manager.py:test_delete_persona, tests/test_persona_manager.py:test_load_all_merged, tests/test_persona_manager.py:test_save_persona, tests/test_preset_manager.py:test_delete_preset, tests/test_preset_manager.py:test_load_all_merged, tests/test_preset_manager.py:test_save_preset_global, tests/test_preset_manager.py:test_save_preset_project, tests/test_presets.py:TestPresetManager.test_delete_preset, tests/test_presets.py:TestPresetManager.test_project_overwrites_global, tests/test_presets.py:TestPresetManager.test_save_and_load_global, tests/test_presets.py:TestPresetManager.test_save_and_load_project]
"""
return self.load_all_presets()
def save_preset(self, preset: ToolPreset, scope: str = "project") -> None:
"""
[C: tests/test_preset_manager.py:test_save_preset_global, tests/test_preset_manager.py:test_save_preset_project, tests/test_preset_manager.py:test_save_preset_project_no_root, tests/test_presets.py:TestPresetManager.test_delete_preset, tests/test_presets.py:TestPresetManager.test_project_overwrites_global, tests/test_presets.py:TestPresetManager.test_save_and_load_global, tests/test_presets.py:TestPresetManager.test_save_and_load_project]
[C: tests/test_preset_manager.py:test_save_preset_global, tests/test_preset_manager.py:test_save_preset_project, tests/test_preset_manager.py:test_save_preset_project_no_root, tests/test_presets.py:TestPresetManager.test_delete_preset, tests/test_presets.py:TestPresetManager.test_project_overwrites_global, tests/test_presets.py:TestPresetManager.test_save_and_load_global, tests/test_presets.py:TestPresetManager.test_save_and_load_project]
"""
path = self._get_path(scope)
data = self._read_raw(path)
@@ -81,7 +80,7 @@ class ToolPresetManager:
def delete_preset(self, name: str, scope: str = "project") -> None:
"""
[C: tests/test_preset_manager.py:test_delete_preset, tests/test_presets.py:TestPresetManager.test_delete_preset]
[C: tests/test_preset_manager.py:test_delete_preset, tests/test_presets.py:TestPresetManager.test_delete_preset]
"""
path = self._get_path(scope)
data = self._read_raw(path)
@@ -91,7 +90,7 @@ class ToolPresetManager:
def load_all_bias_profiles(self) -> Dict[str, BiasProfile]:
"""
[C: tests/test_tool_preset_manager.py:test_bias_profiles_merged, tests/test_tool_preset_manager.py:test_delete_bias_profile, tests/test_tool_preset_manager.py:test_save_bias_profile]
[C: tests/test_tool_preset_manager.py:test_bias_profiles_merged, tests/test_tool_preset_manager.py:test_delete_bias_profile, tests/test_tool_preset_manager.py:test_save_bias_profile]
"""
global_path = paths.get_global_tool_presets_path()
global_data = self._read_raw(global_path).get("bias_profiles", {})
@@ -110,7 +109,7 @@ class ToolPresetManager:
for name, config in project_data.items():
if isinstance(config, dict):
cfg = dict(config)
if "name" not in cfg:
if "name" not in cfg:
cfg["name"] = name
profiles[name] = BiasProfile.from_dict(cfg)
@@ -118,7 +117,7 @@ class ToolPresetManager:
def save_bias_profile(self, profile: BiasProfile, scope: str = "project") -> None:
"""
[C: tests/test_tool_preset_manager.py:test_save_bias_profile]
[C: tests/test_tool_preset_manager.py:test_save_bias_profile]
"""
path = self._get_path(scope)
data = self._read_raw(path)
@@ -129,10 +128,10 @@ class ToolPresetManager:
def delete_bias_profile(self, name: str, scope: str = "project") -> None:
"""
[C: tests/test_tool_preset_manager.py:test_delete_bias_profile]
[C: tests/test_tool_preset_manager.py:test_delete_bias_profile]
"""
path = self._get_path(scope)
data = self._read_raw(path)
if "bias_profiles" in data and name in data["bias_profiles"]:
del data["bias_profiles"][name]
self._write_raw(path, data)
self._write_raw(path, data)
+38 -38
View File
@@ -6,10 +6,10 @@ class VendorMetric:
"""Atomic vendor-state metric.
[C: src/gui_2.py:render_vendor_state]
"""
key: str
label: str
value: str
state: str
key: str
label: str
value: str
state: str
tooltip: str
def get_vendor_state(app) -> list[VendorMetric]:
@@ -18,64 +18,64 @@ def get_vendor_state(app) -> list[VendorMetric]:
"""
out: list[VendorMetric] = []
out.append(VendorMetric(
key="provider_model",
label="Provider / Model",
value=f"{app.current_provider} / {app.current_model}",
state="info",
tooltip="The vendor and model that will handle the next request."
key = "provider_model",
label = "Provider / Model",
value = f"{app.current_provider} / {app.current_model}",
state = "info",
tooltip = "The vendor and model that will handle the next request."
))
ctrl = getattr(app, "controller", None)
tt = getattr(ctrl, "token_tracker", None) if ctrl else None
tt = getattr(ctrl, "token_tracker", None) if ctrl else None
if tt and getattr(tt, "limit", 0):
pct = 100.0 * getattr(tt, "used", 0) / tt.limit
pct = 100.0 * getattr(tt, "used", 0) / tt.limit
state = "warn" if pct > 75 else "ok"
out.append(VendorMetric(
key="context_window",
label="Context Window",
value=f"{tt.used:,} / {tt.limit:,} ({pct:.0f}%)",
state=state,
tooltip="Used vs total context window for the current session."
key = "context_window",
label = "Context Window",
value = f"{tt.used:,} / {tt.limit:,} ({pct:.0f}%)",
state = state,
tooltip = "Used vs total context window for the current session."
))
else:
out.append(VendorMetric(
key="context_window", label="Context Window", value="", state="info",
tooltip="No token tracker attached for the current provider."
key = "context_window", label="Context Window", value="", state="info",
tooltip = "No token tracker attached for the current provider."
))
if tt is not None:
hits = getattr(tt, "cache_hits", 0)
miss = getattr(tt, "cache_misses", 0)
hits = getattr(tt, "cache_hits", 0)
miss = getattr(tt, "cache_misses", 0)
total = hits + miss
rate = (100.0 * hits / total) if total else 0.0
rate = (100.0 * hits / total) if total else 0.0
out.append(VendorMetric(
key="cache", label="Cache Hit Rate",
value=f"{rate:.0f}% ({hits:,}/{total:,})",
state="ok" if rate > 50 else "info",
tooltip="Server-side prompt cache hit rate for the current session."
key = "cache", label="Cache Hit Rate",
value = f"{rate:.0f}% ({hits:,}/{total:,})",
state = "ok" if rate > 50 else "info",
tooltip = "Server-side prompt cache hit rate for the current session."
))
else:
out.append(VendorMetric(
key="cache", label="Cache Hit Rate", value="", state="info",
tooltip="No token tracker attached for the current provider."
key = "cache", label="Cache Hit Rate", value="", state="info",
tooltip = "No token tracker attached for the current provider."
))
quota = (getattr(ctrl, "vendor_quota", {}) or {}) if ctrl else {}
quota = (getattr(ctrl, "vendor_quota", {}) or {}) if ctrl else {}
pct_left = quota.get("remaining_pct")
if pct_left is None:
out.append(VendorMetric(
key="quota", label="Vendor Quota", value="", state="info",
tooltip="Vendor did not report quota for the current billing period."
key = "quota", label="Vendor Quota", value="", state="info",
tooltip = "Vendor did not report quota for the current billing period."
))
else:
out.append(VendorMetric(
key="quota", label="Vendor Quota",
value=f"{pct_left}% remaining",
state="ok" if pct_left > 25 else "warn",
tooltip="Approximate quota remaining for the current billing period."
key = "quota", label="Vendor Quota",
value = f"{pct_left}% remaining",
state = "ok" if pct_left > 25 else "warn",
tooltip = "Approximate quota remaining for the current billing period."
))
err = getattr(ctrl, "last_error", None) if ctrl else None
out.append(VendorMetric(
key="last_error", label="Last Error",
value=err.get("class", "none") if err else "none",
state="error" if err else "ok",
tooltip=err.get("message", "No error since session start.") if err else "No error since session start."
key = "last_error", label="Last Error",
value = err.get("class", "none") if err else "none",
state = "error" if err else "ok",
tooltip = err.get("message", "No error since session start.") if err else "No error since session start."
))
return out
+4 -5
View File
@@ -29,9 +29,8 @@ class WorkspaceManager:
def load_all_profiles(self) -> Dict[str, WorkspaceProfile]:
"""
Merges global and project profiles into a single dictionary.
[C: tests/test_workspace_manager.py:test_delete_profile, tests/test_workspace_manager.py:test_load_all_profiles_merged, tests/test_workspace_manager.py:test_save_profile_global_and_project]
Merges global and project profiles into a single dictionary.
[C: tests/test_workspace_manager.py:test_delete_profile, tests/test_workspace_manager.py:test_load_all_profiles_merged, tests/test_workspace_manager.py:test_save_profile_global_and_project]
"""
profiles = {}
@@ -50,7 +49,7 @@ class WorkspaceManager:
def save_profile(self, profile: WorkspaceProfile, scope: str = "project") -> None:
"""
[C: tests/test_workspace_manager.py:test_delete_profile, tests/test_workspace_manager.py:test_save_profile_global_and_project]
[C: tests/test_workspace_manager.py:test_delete_profile, tests/test_workspace_manager.py:test_save_profile_global_and_project]
"""
path = self._get_path(scope)
data = self._load_file(path)
@@ -62,7 +61,7 @@ class WorkspaceManager:
def delete_profile(self, name: str, scope: str = "project") -> None:
"""
[C: tests/test_workspace_manager.py:test_delete_profile]
[C: tests/test_workspace_manager.py:test_delete_profile]
"""
path = self._get_path(scope)
data = self._load_file(path)