From 095368bca2c5f757a9215d202356a9b7e9c43e5a Mon Sep 17 00:00:00 2001 From: Ed_ Date: Mon, 4 May 2026 21:47:54 -0400 Subject: [PATCH] feat(rag): implement incremental and parallel indexing performance optimizations --- conductor/tracks/rag_support_20260308/plan.md | 2 +- src/app_controller.py | 20 +++++++++++-- src/rag_engine.py | 30 ++++++++++++++++++- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/conductor/tracks/rag_support_20260308/plan.md b/conductor/tracks/rag_support_20260308/plan.md index 3e78b58..03ce520 100644 --- a/conductor/tracks/rag_support_20260308/plan.md +++ b/conductor/tracks/rag_support_20260308/plan.md @@ -41,6 +41,6 @@ ## Phase 4: Refinement & Advanced RAG - [x] Task: Implement support for external RAG APIs/MCP servers. f57e2fe - [x] Create a bridge in `src/rag_engine.py` to call external RAG tools via the MCP interface. f57e2fe -- [ ] Task: Optimize indexing performance for large projects (e.g., incremental updates, parallel chunking). +- [x] Task: Optimize indexing performance for large projects (e.g., incremental updates, parallel chunking). f57e2fe - [ ] Task: Perform a final end-to-end verification with a large codebase. - [ ] Task: Conductor - User Manual Verification 'Phase 4: Refinement & Advanced RAG' (Protocol in workflow.md) diff --git a/src/app_controller.py b/src/app_controller.py index 9727eed..2bac360 100644 --- a/src/app_controller.py +++ b/src/app_controller.py @@ -658,9 +658,23 @@ class AppController: def _run(): try: self._set_rag_status("indexing...") - for f in self.files: - path = f.path if hasattr(f, "path") else str(f) - self.rag_engine.index_file(path) + import concurrent.futures + + # 1. Incremental indexing of current files in parallel + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures = [] + for f in self.files: + path = f.path if hasattr(f, "path") else str(f) + futures.append(executor.submit(self.rag_engine.index_file, path)) + concurrent.futures.wait(futures) + + # 2. Cleanup stale entries (files no longer tracked) + indexed_paths = self.rag_engine.get_all_indexed_paths() + current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files} + stale_paths = [p for p in indexed_paths if p not in current_paths] + if stale_paths: + self.rag_engine.delete_documents_by_path(stale_paths) + self._set_rag_status("ready") except Exception as e: self._set_rag_status(f"error: {e}") diff --git a/src/rag_engine.py b/src/rag_engine.py index 576846e..699bcca 100644 --- a/src/rag_engine.py +++ b/src/rag_engine.py @@ -149,6 +149,20 @@ class RAGEngine: if not os.path.exists(full_path): return + try: + mtime = os.path.getmtime(full_path) + except Exception: + return + + # Incremental check: see if we already have this file with the same mtime + try: + res = self.collection.get(where={"path": file_path}, limit=1, include=["metadatas"]) + if res and res["metadatas"] and res["metadatas"][0]: + if res["metadatas"][0].get("mtime") == mtime: + return + except Exception: + pass + try: with open(full_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() @@ -167,7 +181,7 @@ class RAGEngine: return ids = [f"{file_path}_{i}" for i in range(len(chunks))] - metadatas = [{"path": file_path, "chunk": i} for i in range(len(chunks))] + metadatas = [{"path": file_path, "chunk": i, "mtime": mtime} for i in range(len(chunks))] self.add_documents(ids, chunks, metadatas) def _search_mcp(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: @@ -216,3 +230,17 @@ class RAGEngine: if not self.config.enabled or self.collection == "mock": return self.collection.delete(ids=ids) + + def get_all_indexed_paths(self) -> List[str]: + if not self.config.enabled or self.collection == "mock": + return [] + res = self.collection.get(include=["metadatas"]) + if not res or not res["metadatas"]: + return [] + return list(set(m.get("path") for m in res["metadatas"] if m.get("path"))) + + def delete_documents_by_path(self, file_paths: List[str]): + if not self.config.enabled or self.collection == "mock": + return + for path in file_paths: + self.collection.delete(where={"path": path})