Files
manual_slop/src/rag_engine.py
T

247 lines
7.5 KiB
Python

import os
import sys
import asyncio
import json
from typing import List, Dict, Any, Optional
import chromadb
from chromadb.config import Settings
from src import models
from src import mcp_client
try:
from sentence_transformers import SentenceTransformer
except ImportError:
SentenceTransformer = None
from google import genai
from google.genai import types
from src import ai_client
class BaseEmbeddingProvider:
def embed(self, texts: List[str]) -> List[List[float]]:
raise NotImplementedError()
class LocalEmbeddingProvider(BaseEmbeddingProvider):
def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
if SentenceTransformer is None:
raise ImportError("sentence-transformers is not installed")
self.model = SentenceTransformer(model_name)
def embed(self, texts: List[str]) -> List[List[float]]:
embeddings = self.model.encode(texts)
return embeddings.tolist()
class GeminiEmbeddingProvider(BaseEmbeddingProvider):
def __init__(self, model_name: str = 'text-embedding-004'):
self.model_name = model_name
def embed(self, texts: List[str]) -> List[List[float]]:
ai_client._ensure_gemini_client()
client = ai_client._gemini_client
if not client:
raise ValueError("Gemini client not initialized")
# For text-embedding-004, we can embed a batch
res = client.models.embed_content(
model=self.model_name,
contents=texts,
config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT")
)
return [e.values for e in res.embeddings]
class RAGEngine:
def __init__(self, config: models.RAGConfig, base_dir: str = "."):
self.config = config
self.base_dir = base_dir
self.client = None
self.collection = None
self.embedding_provider = None
if not self.config.enabled:
return
self._init_embedding_provider()
self._init_vector_store()
def _init_embedding_provider(self):
if self.config.embedding_provider == 'gemini':
self.embedding_provider = GeminiEmbeddingProvider()
elif self.config.embedding_provider == 'local':
self.embedding_provider = LocalEmbeddingProvider()
else:
raise ValueError(f"Unknown embedding provider: {self.config.embedding_provider}")
def _init_vector_store(self):
vs_config = self.config.vector_store
if vs_config.provider == 'chroma':
db_path = os.path.join(self.base_dir, ".slop_cache", "chroma_db")
os.makedirs(db_path, exist_ok=True)
self.client = chromadb.PersistentClient(path=db_path)
self.collection = self.client.get_or_create_collection(name=vs_config.collection_name)
elif vs_config.provider == 'mock':
self.client = "mock"
self.collection = "mock"
else:
raise ValueError(f"Unknown vector store provider: {vs_config.provider}")
def is_empty(self) -> bool:
if not self.config.enabled:
return True
if self.config.vector_store.provider == 'mock' or self.collection == "mock":
return True
if self.collection is None:
return True
return self.collection.count() == 0
def add_documents(self, ids: List[str], texts: List[str], metadatas: Optional[List[Dict[str, Any]]] = None):
if not self.config.enabled or self.collection == "mock":
return
embeddings = self.embedding_provider.embed(texts)
self.collection.upsert(
ids=ids,
embeddings=embeddings,
documents=texts,
metadatas=metadatas
)
def _chunk_text(self, content: str) -> List[str]:
"""Character-based chunking with overlap."""
chunks = []
if not content:
return chunks
chunk_size = self.config.chunk_size
overlap = self.config.chunk_overlap
start = 0
while start < len(content):
end = start + chunk_size
chunks.append(content[start:end])
if end >= len(content):
break
start += (chunk_size - overlap)
return chunks
def _chunk_code(self, content: str, file_path: str) -> List[str]:
"""AST-aware chunking for Python code."""
try:
from src.file_cache import ASTParser
parser = ASTParser("python")
tree = parser.parse(content)
chunks = []
# Capture classes and top-level functions
for node in tree.root_node.children:
if node.type in ("function_definition", "class_definition"):
chunks.append(content[node.start_byte:node.end_byte])
# Fallback if no structural chunks found or if file is small
if not chunks or len(content) < self.config.chunk_size:
return self._chunk_text(content)
return chunks
except Exception:
return self._chunk_text(content)
def index_file(self, file_path: str):
"""Reads, chunks, and indexes a file into the vector store."""
if not self.config.enabled or self.collection == "mock":
return
full_path = os.path.join(self.base_dir, file_path)
if not os.path.exists(full_path):
return
try:
mtime = os.path.getmtime(full_path)
except Exception:
return
# Incremental check: see if we already have this file with the same mtime
try:
res = self.collection.get(where={"path": file_path}, limit=1, include=["metadatas"])
if res and res["metadatas"] and res["metadatas"][0]:
if res["metadatas"][0].get("mtime") == mtime:
return
except Exception:
pass
try:
with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
except Exception:
return
# Remove old entries for this file
self.collection.delete(where={"path": file_path})
if file_path.lower().endswith(".py"):
chunks = self._chunk_code(content, file_path)
else:
chunks = self._chunk_text(content)
if not chunks:
return
ids = [f"{file_path}_{i}" for i in range(len(chunks))]
metadatas = [{"path": file_path, "chunk": i, "mtime": mtime} for i in range(len(chunks))]
self.add_documents(ids, chunks, metadatas)
def _search_mcp(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
async def _async_search_mcp():
tool_name = self.config.vector_store.mcp_tool or "rag_search"
args = {"query": query, "top_k": top_k}
res_str = await mcp_client.async_dispatch(tool_name, args)
try:
data = json.loads(res_str)
if isinstance(data, list):
return data
elif isinstance(data, dict) and "results" in data:
return data["results"]
return []
except:
return []
return asyncio.run(_async_search_mcp())
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
if not self.config.enabled:
return []
if self.config.vector_store.provider == 'mcp':
return self._search_mcp(query, top_k)
if self.collection == "mock":
return []
query_embedding = self.embedding_provider.embed([query])[0]
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
ret = []
if results and results["ids"] and results["ids"][0]:
for i in range(len(results["ids"][0])):
ret.append({
"id": results["ids"][0][i],
"document": results["documents"][0][i],
"metadata": results["metadatas"][0][i] if results["metadatas"] else {},
"distance": results["distances"][0][i] if "distances" in results and results["distances"] else 0.0
})
return ret
def delete_documents(self, ids: List[str]):
if not self.config.enabled or self.collection == "mock":
return
self.collection.delete(ids=ids)
def get_all_indexed_paths(self) -> List[str]:
if not self.config.enabled or self.collection == "mock":
return []
res = self.collection.get(include=["metadatas"])
if not res or not res["metadatas"]:
return []
return list(set(m.get("path") for m in res["metadatas"] if m.get("path")))
def delete_documents_by_path(self, file_paths: List[str]):
if not self.config.enabled or self.collection == "mock":
return
for path in file_paths:
self.collection.delete(where={"path": path})