feat(rag): implement incremental and parallel indexing performance optimizations
This commit is contained in:
@@ -41,6 +41,6 @@
|
||||
## Phase 4: Refinement & Advanced RAG
|
||||
- [x] Task: Implement support for external RAG APIs/MCP servers. f57e2fe
|
||||
- [x] Create a bridge in `src/rag_engine.py` to call external RAG tools via the MCP interface. f57e2fe
|
||||
- [ ] Task: Optimize indexing performance for large projects (e.g., incremental updates, parallel chunking).
|
||||
- [x] Task: Optimize indexing performance for large projects (e.g., incremental updates, parallel chunking). f57e2fe
|
||||
- [ ] Task: Perform a final end-to-end verification with a large codebase.
|
||||
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Refinement & Advanced RAG' (Protocol in workflow.md)
|
||||
|
||||
+15
-1
@@ -658,9 +658,23 @@ class AppController:
|
||||
def _run():
|
||||
try:
|
||||
self._set_rag_status("indexing...")
|
||||
import concurrent.futures
|
||||
|
||||
# 1. Incremental indexing of current files in parallel
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
|
||||
futures = []
|
||||
for f in self.files:
|
||||
path = f.path if hasattr(f, "path") else str(f)
|
||||
self.rag_engine.index_file(path)
|
||||
futures.append(executor.submit(self.rag_engine.index_file, path))
|
||||
concurrent.futures.wait(futures)
|
||||
|
||||
# 2. Cleanup stale entries (files no longer tracked)
|
||||
indexed_paths = self.rag_engine.get_all_indexed_paths()
|
||||
current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files}
|
||||
stale_paths = [p for p in indexed_paths if p not in current_paths]
|
||||
if stale_paths:
|
||||
self.rag_engine.delete_documents_by_path(stale_paths)
|
||||
|
||||
self._set_rag_status("ready")
|
||||
except Exception as e:
|
||||
self._set_rag_status(f"error: {e}")
|
||||
|
||||
+29
-1
@@ -149,6 +149,20 @@ class RAGEngine:
|
||||
if not os.path.exists(full_path):
|
||||
return
|
||||
|
||||
try:
|
||||
mtime = os.path.getmtime(full_path)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
# Incremental check: see if we already have this file with the same mtime
|
||||
try:
|
||||
res = self.collection.get(where={"path": file_path}, limit=1, include=["metadatas"])
|
||||
if res and res["metadatas"] and res["metadatas"][0]:
|
||||
if res["metadatas"][0].get("mtime") == mtime:
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
content = f.read()
|
||||
@@ -167,7 +181,7 @@ class RAGEngine:
|
||||
return
|
||||
|
||||
ids = [f"{file_path}_{i}" for i in range(len(chunks))]
|
||||
metadatas = [{"path": file_path, "chunk": i} for i in range(len(chunks))]
|
||||
metadatas = [{"path": file_path, "chunk": i, "mtime": mtime} for i in range(len(chunks))]
|
||||
self.add_documents(ids, chunks, metadatas)
|
||||
|
||||
def _search_mcp(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
|
||||
@@ -216,3 +230,17 @@ class RAGEngine:
|
||||
if not self.config.enabled or self.collection == "mock":
|
||||
return
|
||||
self.collection.delete(ids=ids)
|
||||
|
||||
def get_all_indexed_paths(self) -> List[str]:
|
||||
if not self.config.enabled or self.collection == "mock":
|
||||
return []
|
||||
res = self.collection.get(include=["metadatas"])
|
||||
if not res or not res["metadatas"]:
|
||||
return []
|
||||
return list(set(m.get("path") for m in res["metadatas"] if m.get("path")))
|
||||
|
||||
def delete_documents_by_path(self, file_paths: List[str]):
|
||||
if not self.config.enabled or self.collection == "mock":
|
||||
return
|
||||
for path in file_paths:
|
||||
self.collection.delete(where={"path": path})
|
||||
|
||||
Reference in New Issue
Block a user