From db118f0a5c36c3e0e5fd299ae9215251afe49a66 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sat, 28 Feb 2026 07:51:02 -0500 Subject: [PATCH] updates to tools and mma skills --- .gemini/agents/tier1-orchestrator.md | 6 + .gemini/agents/tier2-tech-lead.md | 6 + .gemini/agents/tier3-worker.md | 6 + .gemini/agents/tier4-qa.md | 6 + .gemini/tools.json | Bin 6406 -> 29338 bytes ..._outline.json => py_get_code_outline.json} | 4 +- ...hon_skeleton.json => py_get_skeleton.json} | 4 +- .../plan.md | 2 +- conductor/workflow.md | 7 +- mma-orchestrator/SKILL.md | 12 +- scripts/ai_style_formatter.py | 229 ++++---- scripts/check_hints.py | 34 +- scripts/check_hints_v2.py | 50 +- scripts/cli_tool_bridge.py | 262 +++++---- scripts/inject_tools.py | 289 ++++++++++ scripts/mma_exec.py | 502 ++++++++---------- scripts/slice_tools.py | 64 +++ scripts/temp_def.py | 97 ++++ scripts/tool_call.py | 68 ++- scripts/tool_discovery.py | 69 ++- scripts/type_hint_scanner.py | 3 +- 21 files changed, 1070 insertions(+), 650 deletions(-) rename .gemini/tools/{get_code_outline.json => py_get_code_outline.json} (83%) rename .gemini/tools/{get_python_skeleton.json => py_get_skeleton.json} (82%) create mode 100644 scripts/inject_tools.py create mode 100644 scripts/slice_tools.py create mode 100644 scripts/temp_def.py diff --git a/.gemini/agents/tier1-orchestrator.md b/.gemini/agents/tier1-orchestrator.md index b8238e1..f47685a 100644 --- a/.gemini/agents/tier1-orchestrator.md +++ b/.gemini/agents/tier1-orchestrator.md @@ -15,6 +15,12 @@ tools: - discovered_tool_fetch_url - activate_skill - discovered_tool_run_powershell + - discovered_tool_py_find_usages + - discovered_tool_py_get_imports + - discovered_tool_py_check_syntax + - discovered_tool_py_get_hierarchy + - discovered_tool_py_get_docstring + - discovered_tool_get_tree --- STRICT SYSTEM DIRECTIVE: You are a Tier 1 Orchestrator. Focused on product alignment, high-level planning, and track initialization. diff --git a/.gemini/agents/tier2-tech-lead.md b/.gemini/agents/tier2-tech-lead.md index 7924ce4..d674701 100644 --- a/.gemini/agents/tier2-tech-lead.md +++ b/.gemini/agents/tier2-tech-lead.md @@ -17,6 +17,12 @@ tools: - discovered_tool_fetch_url - activate_skill - discovered_tool_run_powershell + - discovered_tool_py_find_usages + - discovered_tool_py_get_imports + - discovered_tool_py_check_syntax + - discovered_tool_py_get_hierarchy + - discovered_tool_py_get_docstring + - discovered_tool_get_tree --- STRICT SYSTEM DIRECTIVE: You are a Tier 2 Tech Lead. Focused on architectural design and track execution. diff --git a/.gemini/agents/tier3-worker.md b/.gemini/agents/tier3-worker.md index 4889193..42fbf3b 100644 --- a/.gemini/agents/tier3-worker.md +++ b/.gemini/agents/tier3-worker.md @@ -17,6 +17,12 @@ tools: - discovered_tool_fetch_url - activate_skill - discovered_tool_run_powershell + - discovered_tool_py_find_usages + - discovered_tool_py_get_imports + - discovered_tool_py_check_syntax + - discovered_tool_py_get_hierarchy + - discovered_tool_py_get_docstring + - discovered_tool_get_tree --- STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). Your goal is to implement specific code changes or tests based on the provided task. diff --git a/.gemini/agents/tier4-qa.md b/.gemini/agents/tier4-qa.md index fd16830..424176b 100644 --- a/.gemini/agents/tier4-qa.md +++ b/.gemini/agents/tier4-qa.md @@ -15,6 +15,12 @@ tools: - discovered_tool_fetch_url - activate_skill - discovered_tool_run_powershell + - discovered_tool_py_find_usages + - discovered_tool_py_get_imports + - discovered_tool_py_check_syntax + - discovered_tool_py_get_hierarchy + - discovered_tool_py_get_docstring + - discovered_tool_get_tree --- STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. Your goal is to analyze errors, summarize logs, or verify tests. diff --git a/.gemini/tools.json b/.gemini/tools.json index 8b44f602917ed7e0972cf2198a2b2eb58ff2cfd0..ef0b20053e2220406cefa3c52ce7c1f983677d75 100644 GIT binary patch literal 29338 zcmeI5TW=f55ryYDK>h|uzFFAt5?-=brP<8TRii^zMWS`)y(cVuP*HGnU&)u>@7+d*_Ai;eq<$GT1hAN z?s;`kJ*@sv{n_3hg{Lm;j;Y-_vQMw<@1gyA&7)&+U~~ytcm=Nx#PS{iXH&)b5^DS9S;W&g@%Sb73WptaQpe z4zIL^{-{?oyLx5!T-xi@zEw-=JPpap-kU&eIbYNt-xsPWC` zPjh%-EjhQld{$#OKx`t*cGjAv$6=dr!cTg&$+G^TB>;4a-+wubRUQl>ToO|6Mf zu9mBN)+Z-~0UgEXG-pMEmzH>&%Mlhqs2;kF{?<7PF@w zQ_jDvRY(tVpfQR%mzKF>Gvy;$*7=XDzF2!&Yd-&V{XMZ(FydothuVWqFj7~w(ccMj z-LA<`7G*_^oZF}z+DxK#XwRfpi)5+PT%O@cP$24Uy@lxOzDYdyP?w|AxAxxUYE=Dp zi4b=e^?zgYlD6!)`q^e8)qZ4vk@2~IS$(jXpf$`H-wLZrERpnH24uMm_YOLoBUmoo zU3J*C!RN*2p)MPKhfDa^q_tQmtN>^qEiagykM%l@-A~FvYlV<*Pmz8s$wjC~h%Afy zI%vYgu4_JIhtS#F0(aBnODpX>wBaIX9-4M!sDXvcvD)jSh0O2Upp(1R@9gKU{ob*1 zuBzRY8ci>SOlf~7_T0!y`(V9>X;3qk)vbpkOxg26%X7*5+*+P$L@tHxdf(h)x4152 z+hZ_~TC@z?g|&JeyIqrJFar&7Jwdn7lkG5?O=sXHn9n+X z-xW(A*__}B;E}-MPXji}!+Rtw)Fzw`zxfM2Dt zr;Uu)@9h_c_R^R>O!IwcmwR}WAdWY&YqEZn;G-NZt8P2CptfdVjxTqo4bj zx7Vvny?UepWZe!L-UQ0QgT~8P#ysd8+J|MBcIJyI#+7A_mO7TkW>VCNej(^^u+%>E z?IPeK7~+0kiAT|2D=`QxJ+?l=%~THf6t`gR8GCV zNkGKXcg6PJ2|ufuC+Gx{fL%gN{NvM5CXp{N2s9G|d6l%p*>#&e5GJPfKqya{U^!(f z^1?!ZP`o6Z@XN z!5Eql2t=&&63Lcgg-3?nCx+(o$2T5}Fd$)h7ens36ql*|Dh7URLr%F?*7 zf45`vcYC7IXid51f@Wcf>(aLxSN7}~yawqh*OTbuRGd5_n*vtVS*;MnF_K4td^ zkh3@Bc~+BLuWgeaerKzn6O$YK60M@Z;Amg!^G;1FPXofwR^UNfpTQYcaBpLO)wZ_2 zf|iUmHvQ7ZNKxYLVPe||i|3b8)pmD%K1zJ=W91Kt>Jw|D&(sZZnZKOFq#TQw&U?FF zk8ADQjDx0;BcdTh#Kg&2`5ib*vy5$mJZ&D%fUU}mNm0CsS!s;xx=ge(UzJZwsz=Ul z&X)9ztY_7Y^ z*y>_A`ohZtlOp6T=00ex(q%GgjO#|JHIA)FwX*wIk;^l#cB}g)*IIx4(O%K7JK-<& z5NjMSTd=#7S)yT->1&j?4Y7U9>UN|*EG07ZBIr5M*gg=o<+RD?TdR#XEKnHjly%xYmpqns5$J+NOwBOO} zk(BN252HP@U0RSvaB#)bDFGOU#x zd2wr4<+M`tEg$(|UD>(hy4n4jd1Q&VF1M?(7THU)9lNl^+ZPG`9DL9tyLCM1iZz?@ zz|D{-<~Mf7pMk;Uc&Q!M^)*tiq&LzT`{D5m~P@i)ByxXMWMUD0h-}MP-k7Dt?*o zNo_;fSUV9+{AA?N=hgGb>zPUTuZoE9+f?SodHe@uE^Ese37`aom@xLgW?Z{l} z=jZk$%liArhj*rFvqk()BLe#RK8sSO*7*^R5pEF`SCy{f+2Ogd8UJ=QvQ_T9XcA-`VlC88QqS zKWKc@87^&>eRGX(9@pr3HCw9f<&d;{UtRPm_secC`?>Dy+yst#B%RDZKevQTimnW4 z&$v&z>&%&w$|a+NtP1ifQVtYzE|dkrU95}7Q&Y(LXxquboxT&5n7Oh&=n=a%>KP%5 z@2}hSB{rva#H^6P4=W|24?k>^`LD0_i`BGgk>_`hgCxB(N#0gW>92teuo~!+3Ns+S zTSnm@SKn9nst3!H1^n}Ws_z!xNk8BmM2*<#uH0N?er}RaY<^@VAoYi~mULp;h!$u~ zf4A}ySDjAKHln^`eETE%*!rhkC`}`JW-GAn;Spci-g?DNAT{T$P0XW(^Wxircco6Q z^JDjuXORtMBB|s&C**<25PRhO^^r*(61Hb0G~qmQ#xA-)*>BoH$^JZ%NXK95M}}E5 z*B-&=R{E30dykXNObt$cn1;0I8Au)H4;? zeJ9)TU zCkgk7GkJDvj7>{#uqnP8B7Gr;OLslC{4UN6p$4RUzOWZ{W_4)xLp$Zkcda8%U6ar$ zbQteMdADe$Sf0+ia+?=-1Lnx{Od-`)Dio<3h!S50pGX;)4=symKU{ri_R8aZ2X-&fWi*Z^vGG#wg8@=Y$ZX zk5;)~#wVVd8ndN!#3EjYUh@vj@{y*8STmgIdHonX^f{7%9Da@@0R9b9kF#gJnkBdJozycFM=vtwEa)81n(P=vh@C>AKpPE`E7`qT$w!K`+VpeGubnX>nEv=7AK8Us*uP_70*$9$TcxOB)`ng z)EVes0(U8AM$(3zMuxWM^Zarp+VQ+R*Q@*4^89B5@LCaK+12vwb@w3gS9DeiC(g(0 zqLLe?95CKBNP07)+622FnEk_=#?>(GKtcbSSe+PoJJV@y$xbq-aZl*>^t)~K+t+nK zTHo2)-q#Yn`n6i!MzU;6e>Tn$p`K}4a)xz0;rkF}!~;XxQe3E)?VQ@UB|3Guooe{tsiRB@4 zpLTBF(;z9tG1#AxN{q{scoORlL@J1NBULdMVpRR{E6WGqMA~Us0U<8W2(x0rTHMJs zw3@QqWA*a*Ur;I!eUATeE}kgI3^N1G9)vSC|8H2U)*UFFwRz+Pmf)w)@t!M=ijhGu zzj434BRtH-BT*$Tc{EAq86zd{0~?7a_moetrlITtq>4znyrQya)X7>4nK{b**GbD{ zoY8BnAv{X!G2T9X%gnLHL6nU)@z)X7*F*Skc>e$H$oGk3&fT6}@#S);RJyw3OkZ5% vU?IoPY^J19?HV9CU~L}m%e-nusf&q5WVMD3=9-KST5C|)BC~ie@ncbxowMc(^-^`LU z?O3T{$3W79ZJONAnKy6V%-csqf8&Q}uQqGmJD@$!CZ89!;=O75kaNhqZK8Fxd_Tc= ziX~IgRu!F}9sck$Wv-4~M{)%jlDV|8v9z_0Ggaj_oKxf7pPWT%Of18U&I8kuqi!`1 z?Rste7IS@w6dmQ9*&NH1jIBIvT}w4%kzwO*%UOTQ#&}#HvJbxnSD$7juMLiyE{E&I zZKKz73pqqL3-58NhJCagMx*6DPI=|cXpIoPt(U_ZYQAe$dbS9zY9o{La1)2tZaz$94lne z`E75mYsWppAaz_IQ|A=gMLz?E>78k6>V4UN-OlK=belkeo+*iuXF6eAu_(_;27 zPVNRIg4~%uG?nwXK2X%70yb^4*n0lzJu1lPy*SuGUWlb1)Rlpa&q|G$u)~q3I)nr=VJgmv(|?WyD~4}mMf;olP6D~%$n`ws$&Za z@T7egTeumd5I`Fe1OkwL^KaM1$pfA7#=h>uH8F=+>K^3ra^$Ef@oYJR!NuCs&hXK# zEBPn*?5TRfhspRf9WmUBL~c0wt$5`+ckck?OOAkE=GKh?_m|vyM2XEnF^Tc7rVQ6c zWkdn21^zV0^i)rWz|rwGmaaY^yHJFI|p@qOCK@R(QnJ5KSGCZ)Eb> zA8(EhD4GShcg5T8?Hl7)x!YjT5@d|6@%|4kcdrk-7w8jrw~YR$Os}0=Zd~B$V}oNW7H9NP>%U(u~d| z|9D200goY-_1Mug)uV=jUDif2QA&K0rL3piWhw0gRSQLFQ<)57hItE@_sNnbv)Sw# zH@0;SL;;LBtzF&%ffl047rKbZr7`wZA&9;L%qw0)TR}m%`fDZ5<55ZkPbG6dG%_-m z^BhK#5dADHO3Pjnr_D@7-Gqb=S>h&a44l$C7fx5lC5lo0?{J|6;Z3~+6lBQ4TP{Xu z8Ic9J0a7koRa<1`Mo!YHAH{?Le0gatzbC0qz$Lcj^BeG3>E>`MF44<7KvokK4w zfRM)z&Ko&Hj94vNc2mX>X2G>S6QTzmG$RdWS14TOl&NT$lxaZp{T3ayfL+#-4!D5B ztQ17CglHCtFRql`Hl;)4Yl+k)h*j-2cx|GjiwR~|&aGrPGRwE%>zm?zq$#?xvaiFm zR_bf9Thvi!u$f9nctz}|?hqgJktrD$l#Zyo=&Fp#b5R7pLvPNKqvo=}L^W}z5$=k< z>-V~~P+D54!HQxBPn~o*Np6UtBt!MQT{VsS4wc4LLP?5*GT=-YXh0s~?RriD9#I7< z_Pz-c(IB~U^HD|_d17G#&tZpq2<~671W|jP2+e`U#m#vN5g_)O3PViZ-HJoznTqQI~5&ZJUNgxj!$Bwvs@v zI<{bxwDZZD0nc|eBUxUD3?v8yO3FTd0pxbG?b2TE8ZE*w8L|UVn!uZ zt+FD@b*Yyw8+q^r?F&c454ohpS7&dIP(3XJkT2~!?dO;RDwZyBGB^coFTOzS1)Ymj zhCYOJ0t)ZeDG~DG!Tbbh+|x^`w~6~BuGNlraFv|mR*sS~r?qpmA=OTG_IWuQYSV<$ zX}48VVp3Ri{+dQhh)D4^LonGot6CKxhsUQ=I+Q*somu?aUkCK)o*$nhY!NuL|6SLz zdtEmWr$=U~Lq^VqIdW^3wS)JuUqENK<`9G$FWB2+3*ROsqjBnw=OjS zAwOTWQwG@=(G%s|#wo~7M2O~)Tq@Zv4{OQLr*>42A|xRaomU0ef7lYJFQAbE^Y@#P zY0(8jP@$-)(1r~$*MdU2w-J>la+NkFq}UakJMBgoWGZZssP_vSeP{&iB~#rq1V1%B W?!X_=kc@ZfI&^3HG$0?nd-N~w=4HzO diff --git a/.gemini/tools/get_code_outline.json b/.gemini/tools/py_get_code_outline.json similarity index 83% rename from .gemini/tools/get_code_outline.json rename to .gemini/tools/py_get_code_outline.json index 1bbe8ce..cc60f47 100644 --- a/.gemini/tools/get_code_outline.json +++ b/.gemini/tools/py_get_code_outline.json @@ -1,5 +1,5 @@ { - "name": "get_code_outline", + "name": "py_get_code_outline", "description": "Get a hierarchical outline of a code file. This returns classes, functions, and methods with their line ranges and brief docstrings. Use this to quickly map out a file's structure before reading specific sections.", "parameters": { "type": "object", @@ -13,5 +13,5 @@ "path" ] }, - "command": "python scripts/tool_call.py get_code_outline" + "command": "python scripts/tool_call.py py_get_code_outline" } diff --git a/.gemini/tools/get_python_skeleton.json b/.gemini/tools/py_get_skeleton.json similarity index 82% rename from .gemini/tools/get_python_skeleton.json rename to .gemini/tools/py_get_skeleton.json index ee1dd10..66a6a3f 100644 --- a/.gemini/tools/get_python_skeleton.json +++ b/.gemini/tools/py_get_skeleton.json @@ -1,5 +1,5 @@ { - "name": "get_python_skeleton", + "name": "py_get_skeleton", "description": "Get a skeleton view of a Python file. This returns all classes and function signatures with their docstrings, but replaces function bodies with '...'. Use this to understand module interfaces without reading the full implementation.", "parameters": { "type": "object", @@ -13,5 +13,5 @@ "path" ] }, - "command": "python scripts/tool_call.py get_python_skeleton" + "command": "python scripts/tool_call.py py_get_skeleton" } diff --git a/conductor/archive/mma_utilization_refinement_20260226/plan.md b/conductor/archive/mma_utilization_refinement_20260226/plan.md index 4418edf..c8cdaf1 100644 --- a/conductor/archive/mma_utilization_refinement_20260226/plan.md +++ b/conductor/archive/mma_utilization_refinement_20260226/plan.md @@ -7,7 +7,7 @@ - [ ] Task: Conductor - User Manual Verification 'Phase 1' (Protocol in workflow.md) ## Phase 2: AST Skeleton Extraction (Skeleton Views) -- [x] Task: Enhance `mcp_client.py` with `get_python_skeleton` functionality using `tree-sitter` to extract signatures and docstrings. e950601 +- [x] Task: Enhance `mcp_client.py` with `py_get_skeleton` functionality using `tree-sitter` to extract signatures and docstrings. e950601 - [x] Task: Update `mma_exec.py` to utilize these skeletons for non-target dependencies when preparing context for Tier 3. e950601 - [x] Task: Integrate "Interface-level" scrubbed versions into the sub-agent injection logic. e950601 - [ ] Task: Conductor - User Manual Verification 'Phase 2' (Protocol in workflow.md) diff --git a/conductor/workflow.md b/conductor/workflow.md index 21e7089..406cf1a 100644 --- a/conductor/workflow.md +++ b/conductor/workflow.md @@ -9,7 +9,7 @@ 5. **User Experience First:** Every decision should prioritize user experience 6. **Non-Interactive & CI-Aware:** Prefer non-interactive commands. Use `CI=true` for watch-mode tools (tests, linters) to ensure single execution. 7. **MMA Tiered Delegation is Mandatory:** The Conductor acts as a Tier 1/2 Orchestrator. You MUST delegate all non-trivial coding to Tier 3 Workers and all error analysis to Tier 4 QA Agents. Do NOT perform large file writes directly. -8. **Mandatory Research-First Protocol:** Before reading the full content of any file over 50 lines, you MUST use `get_file_summary`, `get_python_skeleton`, or `get_code_outline` to map the architecture and identify specific target ranges. Use `get_git_diff` to understand recent changes. +8. **Mandatory Research-First Protocol:** Before reading the full content of any file over 50 lines, you MUST use `get_file_summary`, `py_get_skeleton`, `py_get_code_outline`, or `py_get_docstring` to map the architecture and identify specific target ranges. Use `get_git_diff` to understand recent changes. Use `py_find_usages` to locate where symbols are used. ## Task Workflow @@ -24,11 +24,10 @@ All tasks follow a strict lifecycle: 2. **Mark In Progress:** Before beginning work, edit `plan.md` and change the task from `[ ]` to `[~]` 3. **High-Signal Research Phase:** - - **Identify Dependencies:** Use `list_directory` and `grep_search` to find relevant files. - - **Map Architecture:** Use `get_code_outline` or `get_python_skeleton` on identified files to understand their structure. + - **Identify Dependencies:** Use `list_directory`, `get_tree`, and `py_get_imports` to map file relations. + - **Map Architecture:** Use `py_get_code_outline` or `py_get_skeleton` on identified files to understand their structure. - **Analyze Changes:** Use `get_git_diff` if the task involves modifying recently updated code. - **Minimize Token Burn:** Only use `read_file` with `start_line`/`end_line` for specific implementation details once target areas are identified. - 4. **Write Failing Tests (Red Phase):** - **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria. - Take the code generated by the Worker and apply it. diff --git a/mma-orchestrator/SKILL.md b/mma-orchestrator/SKILL.md index 80a6085..c82ef2f 100644 --- a/mma-orchestrator/SKILL.md +++ b/mma-orchestrator/SKILL.md @@ -26,17 +26,19 @@ If you run a test or command that fails with a significant error or large traceb 1. **DO NOT** analyze the raw logs in your own context window. 2. **DO** spawn a stateless Tier 4 agent to diagnose the failure. 3. *Command:* `uv run python scripts/mma_exec.py --role tier4-qa "Analyze this failure and summarize the root cause: [LOG_DATA]"` -4. **Mandatory Research-First Protocol:** Avoid direct `read_file` calls for any file over 50 lines. Use `get_file_summary`, `get_python_skeleton`, or `get_code_outline` first to identify relevant sections. Use `git diff` to understand changes. +4. **Mandatory Research-First Protocol:** Avoid direct `read_file` calls for any file over 50 lines. Use `get_file_summary`, `py_get_skeleton`, or `py_get_code_outline` first to identify relevant sections. Use `git diff` to understand changes. ## 3. Persistent Tech Lead Memory (Tier 2) Unlike the stateless sub-agents (Tiers 3 & 4), the **Tier 2 Tech Lead** maintains persistent context throughout the implementation of a track. Do NOT apply "Context Amnesia" to your own session during track implementation. You are responsible for the continuity of the technical strategy. ## 4. AST Skeleton & Outline Views To minimize context bloat for Tier 2 & 3: -1. Use `get_code_outline` to map out the structure of a file. -2. Use `get_python_skeleton` to understand the interface and docstrings of dependencies. -3. Only use `read_file` with `start_line` and `end_line` for specific implementation details once target areas are identified. -4. Tier 3 workers MUST NOT read the full content of unrelated files. +1. Use `py_get_code_outline` or `get_tree` to map out the structure of a file or project. +2. Use `py_get_skeleton` and `py_get_imports` to understand the interface, docstrings, and dependencies of modules. +3. Use `py_find_usages` to pinpoint where a function or class is called instead of searching the whole codebase. +4. Use `py_check_syntax` after making string replacements to ensure the file is still syntactically valid. +5. Only use `read_file` with `start_line` and `end_line` for specific implementation details once target areas are identified. +6. Tier 3 workers MUST NOT read the full content of unrelated files. ### Example 1: Spawning a Tier 4 QA Agent diff --git a/scripts/ai_style_formatter.py b/scripts/ai_style_formatter.py index 24bc2d9..1a3df36 100644 --- a/scripts/ai_style_formatter.py +++ b/scripts/ai_style_formatter.py @@ -1,125 +1,130 @@ import tokenize import io +import os +import sys def format_code(source: str) -> str: - """ + """ Formats Python code to use exactly 1 space for indentation (including continuations), max 1 blank line between top-level definitions, and 0 blank lines inside function/method bodies. - - Args: - source: The Python source code to format. - - Returns: - The formatted source code. """ - if not source: - return "" - - tokens = list(tokenize.generate_tokens(io.StringIO(source).readline)) - lines = source.splitlines(keepends=True) - num_lines = len(lines) - - block_level = 0 - paren_level = 0 - in_function_stack = [] + if not source: + return "" + try: + tokens = list(tokenize.generate_tokens(io.StringIO(source).readline)) + except tokenize.TokenError: + return source # Return as-is if it's not valid python (e.g. template files) + lines = source.splitlines(keepends=True) + num_lines = len(lines) + block_level = 0 + paren_level = 0 + in_function_stack = [] + expecting_function_indent = False + line_indent = {} + line_is_blank = {i: True for i in range(1, num_lines + 2)} + line_is_string_interior = {i: False for i in range(1, num_lines + 2)} + line_seen = set() + pending_blank_lines = [] + for tok in tokens: + t_type = tok.type + t_string = tok.string + start_line, _ = tok.start + end_line, _ = tok.end + if t_type == tokenize.STRING: + for l in range(start_line + 1, end_line + 1): + line_is_string_interior[l] = True + if t_type not in (tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT, tokenize.ENDMARKER): + for l in range(start_line, end_line + 1): + line_is_blank[l] = False + pending_blank_lines = [] + if t_type == tokenize.INDENT: + block_level += 1 + if expecting_function_indent: + in_function_stack.append(block_level) expecting_function_indent = False - - line_indent = {} - line_is_blank = {i: True for i in range(1, num_lines + 2)} - line_is_string_interior = {i: False for i in range(1, num_lines + 2)} - - line_seen = set() - pending_blank_lines = [] - - for tok in tokens: - t_type = tok.type - t_string = tok.string - start_line, _ = tok.start - end_line, _ = tok.end - - if t_type == tokenize.STRING: - for l in range(start_line + 1, end_line + 1): - line_is_string_interior[l] = True - - if t_type not in (tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT, tokenize.ENDMARKER): - for l in range(start_line, end_line + 1): - line_is_blank[l] = False - pending_blank_lines = [] # Real content seen, clear pending blanks + elif t_type == tokenize.DEDENT: + block_level -= 1 + if in_function_stack and block_level < in_function_stack[-1]: + in_function_stack.pop() + for l in pending_blank_lines: + line_indent[l] = block_level + paren_level + if t_string in (')', ']', '}'): + paren_level -= 1 + if start_line not in line_seen: + line_indent[start_line] = block_level + paren_level + if t_type not in (tokenize.INDENT, tokenize.DEDENT): + line_seen.add(start_line) + if t_type in (tokenize.NL, tokenize.NEWLINE): + pending_blank_lines.append(start_line) + if t_type == tokenize.NAME and t_string == 'def': + expecting_function_indent = True + if t_string in ('(', '[', '{'): + paren_level += 1 + output = [] + consecutive_blanks = 0 + for i in range(1, num_lines + 1): + if line_is_string_interior[i]: + output.append(lines[i-1]) + continue + if line_is_blank[i]: + indent = line_indent.get(i, 0) + if indent > 0: + continue + else: + if consecutive_blanks < 1: + output.append("\n") + consecutive_blanks += 1 + continue + original_line = lines[i-1] + indent = line_indent.get(i, 0) + stripped = original_line.lstrip() + is_def_start = stripped.startswith(('def ', 'class ', 'async def ', '@')) + if is_def_start and output and consecutive_blanks == 0: + prev_line = output[-1].strip() + if prev_line and not prev_line.endswith(':') and not prev_line.startswith('@'): + output.append("\n") + consecutive_blanks += 1 + consecutive_blanks = 0 + output.append(" " * indent + stripped) + if not stripped.endswith('\n') and i < num_lines: + output[-1] += '\n' + if output and not output[-1].endswith('\n'): + output[-1] += '\n' + return "".join(output) - # State updates that affect CURRENT line - if t_type == tokenize.INDENT: - block_level += 1 - if expecting_function_indent: - in_function_stack.append(block_level) - expecting_function_indent = False - elif t_type == tokenize.DEDENT: - block_level -= 1 - if in_function_stack and block_level < in_function_stack[-1]: - in_function_stack.pop() - # Retroactively update pending blank lines to the current (outer) level - for l in pending_blank_lines: - line_indent[l] = block_level + paren_level - - if t_string in (')', ']', '}'): - paren_level -= 1 +def process_file(file_path: str, write: bool) -> None: + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + formatted = format_code(content) + if write: + if formatted != content: + with open(file_path, "w", encoding="utf-8") as f: + f.write(formatted) + print(f"Formatted: {file_path}") + else: + sys.stdout.reconfigure(encoding='utf-8') + sys.stdout.write(formatted) + except Exception as e: + print(f"Error processing {file_path}: {e}") - if start_line not in line_seen: - line_indent[start_line] = block_level + paren_level - if t_type not in (tokenize.INDENT, tokenize.DEDENT): - line_seen.add(start_line) - if t_type in (tokenize.NL, tokenize.NEWLINE): - pending_blank_lines.append(start_line) - - # State updates that affect FUTURE lines/tokens - if t_type == tokenize.NAME and t_string == 'def': - expecting_function_indent = True - if t_string in ('(', '[', '{'): - paren_level += 1 - - output = [] - consecutive_blanks = 0 - - for i in range(1, num_lines + 1): - if line_is_string_interior[i]: - output.append(lines[i-1]) - continue - - if line_is_blank[i]: - indent = line_indent.get(i, 0) - if indent > 0: - continue - else: - if consecutive_blanks < 1: - output.append("\n") - consecutive_blanks += 1 - continue - - consecutive_blanks = 0 - original_line = lines[i-1] - indent = line_indent.get(i, 0) - stripped = original_line.lstrip() - - output.append(" " * indent + stripped) - if not stripped.endswith('\n') and i < num_lines: - output[-1] += '\n' - - if output and not output[-1].endswith('\n'): - output[-1] += '\n' - - return "".join(output) +def main() -> None: + import argparse + parser = argparse.ArgumentParser(description="AI-optimized Python code formatter.") + parser.add_argument("paths", nargs="+", help="Files or directories to format.") + parser.add_argument("--write", action="store_true", help="Write changes back to files.") + parser.add_argument("--exclude", nargs="*", default=[".venv", "__pycache__", ".git"], help="Directories to exclude.") + args = parser.parse_args() + for path in args.paths: + if os.path.isfile(path): + process_file(path, args.write) + elif os.path.isdir(path): + for root, dirs, files in os.walk(path): + dirs[:] = [d for d in dirs if d not in args.exclude] + for file in files: + if file.endswith(".py"): + process_file(os.path.join(root, file), args.write) if __name__ == "__main__": - import sys - import os - if len(sys.argv) > 1: - file_path = sys.argv[1] - with open(file_path, "r", encoding="utf-8") as f: - content = f.read() - formatted = format_code(content) - if len(sys.argv) > 2 and sys.argv[2] == "--write": - with open(file_path, "w", encoding="utf-8") as f: - f.write(formatted) - else: - sys.stdout.reconfigure(encoding='utf-8') - sys.stdout.write(formatted) + main() diff --git a/scripts/check_hints.py b/scripts/check_hints.py index 2735bcb..09c5508 100644 --- a/scripts/check_hints.py +++ b/scripts/check_hints.py @@ -3,20 +3,20 @@ import sys files = ['ai_client.py', 'aggregate.py', 'mcp_client.py', 'shell_runner.py'] for file_path in files: - print(f"Checking {file_path}...") - with open(file_path, 'r', encoding='utf-8') as f: - lines = f.readlines() - for i, line in enumerate(lines): - if line.strip().startswith('def '): - if '->' not in line: - # Check next line if it's a multiline def - if '):' not in line: - full_def = line - j = i + 1 - while j < len(lines) and '):' not in lines[j-1]: - full_def += lines[j] - j += 1 - if '->' not in full_def: - print(f" Missing hint at line {i+1}: {line.strip()}") - else: - print(f" Missing hint at line {i+1}: {line.strip()}") + print(f"Checking {file_path}...") + with open(file_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + for i, line in enumerate(lines): + if line.strip().startswith('def '): + if '->' not in line: + # Check next line if it's a multiline def + if '):' not in line: + full_def = line + j = i + 1 + while j < len(lines) and '):' not in lines[j-1]: + full_def += lines[j] + j += 1 + if '->' not in full_def: + print(f" Missing hint at line {i+1}: {line.strip()}") + else: + print(f" Missing hint at line {i+1}: {line.strip()}") diff --git a/scripts/check_hints_v2.py b/scripts/check_hints_v2.py index 1daf17f..86a37e0 100644 --- a/scripts/check_hints_v2.py +++ b/scripts/check_hints_v2.py @@ -3,29 +3,27 @@ import sys files = ['ai_client.py', 'aggregate.py', 'mcp_client.py', 'shell_runner.py'] for file_path in files: - print(f"Checking {file_path}...") - with open(file_path, 'r', encoding='utf-8') as f: - content = f.read() - # Find all function definitions - # This regex is simplified and might miss some edge cases (like multi-line defs) - # But it's better than nothing. - defs = re.finditer(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\((.*?)\)\s*(->\s*.*?)?:', content, re.DOTALL) - for m in defs: - name = m.group(1) - args = m.group(2).strip() - ret = m.group(3) - - if not ret: - print(f" Missing return type: {name}({args})") - - # Check arguments - if args: - arg_list = [a.strip() for a in args.split(',')] - for arg in arg_list: - if not arg or arg == 'self' or arg == 'cls': - continue - if ':' not in arg and '=' not in arg: - print(f" Missing arg type: {name} -> {arg}") - elif ':' not in arg and '=' in arg: - # arg=val (missing type) - print(f" Missing arg type: {name} -> {arg}") + print(f"Checking {file_path}...") + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + # Find all function definitions + # This regex is simplified and might miss some edge cases (like multi-line defs) + # But it's better than nothing. + defs = re.finditer(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\((.*?)\)\s*(->\s*.*?)?:', content, re.DOTALL) + for m in defs: + name = m.group(1) + args = m.group(2).strip() + ret = m.group(3) + if not ret: + print(f" Missing return type: {name}({args})") + # Check arguments + if args: + arg_list = [a.strip() for a in args.split(',')] + for arg in arg_list: + if not arg or arg == 'self' or arg == 'cls': + continue + if ':' not in arg and '=' not in arg: + print(f" Missing arg type: {name} -> {arg}") + elif ':' not in arg and '=' in arg: + # arg=val (missing type) + print(f" Missing arg type: {name} -> {arg}") diff --git a/scripts/cli_tool_bridge.py b/scripts/cli_tool_bridge.py index 8c00437..3790403 100644 --- a/scripts/cli_tool_bridge.py +++ b/scripts/cli_tool_bridge.py @@ -7,151 +7,131 @@ import os # This helps in cases where the script is run from different directories project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if project_root not in sys.path: - sys.path.append(project_root) + sys.path.append(project_root) try: - from api_hook_client import ApiHookClient + from api_hook_client import ApiHookClient except ImportError: - # Fallback if the script is run from the project root directly, - # or if the above path append didn't work for some reason. - try: - from api_hook_client import ApiHookClient - except ImportError: - # Use basic print for fatal errors if logging isn't set up yet - print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr) - sys.exit(1) # Exit if the core dependency cannot be imported - +# Fallback if the script is run from the project root directly, +# or if the above path append didn't work for some reason. + try: + from api_hook_client import ApiHookClient + except ImportError: + # Use basic print for fatal errors if logging isn't set up yet + print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr) + sys.exit(1) # Exit if the core dependency cannot be imported def main(): - # Setup basic logging to stderr. - # Set level to DEBUG to capture all messages, including debug info. - logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr) - - logging.debug("CLI Tool Bridge script started.") - - try: - # 1. Read JSON from sys.stdin - input_data = sys.stdin.read() - - if not input_data: - logging.debug("No input received from stdin. Exiting gracefully.") - return - - logging.debug(f"Received raw input data: {input_data}") - - try: - hook_input = json.loads(input_data) - except json.JSONDecodeError: - logging.error("Failed to decode JSON from stdin.") - print(json.dumps({ - "decision": "deny", - "reason": "Invalid JSON received from stdin." - })) - return - - # Initialize variables for tool name and arguments - tool_name = None - tool_args = {} - - # 2. Try to parse input in Gemini API format ('name', 'input') - logging.debug("Attempting to parse input in Gemini API format ('name', 'input').") - if 'name' in hook_input and hook_input['name'] is not None: - tool_name = hook_input['name'] - logging.debug(f"Found Gemini API format tool name: {tool_name}") - - if 'input' in hook_input and hook_input['input'] is not None: - if isinstance(hook_input['input'], dict): - tool_args = hook_input['input'] - logging.debug(f"Found Gemini API format tool input: {tool_args}") - else: - logging.warning("Gemini API format 'input' is not a dictionary. Ignoring.") - - # 3. If Gemini format wasn't fully present, try the legacy format ('tool_name', 'tool_input') - if tool_name is None: - logging.debug("Gemini API format not fully detected. Falling back to legacy format ('tool_name', 'tool_input').") - tool_name = hook_input.get('tool_name') - if tool_name: - logging.debug(f"Found legacy format tool name: {tool_name}") - - tool_input_legacy = hook_input.get('tool_input') - if tool_input_legacy is not None: - if isinstance(tool_input_legacy, dict): - tool_args = tool_input_legacy - logging.debug(f"Found legacy format tool input: {tool_args}") - else: - logging.warning("Legacy format 'tool_input' is not a dictionary. Ignoring.") - - # Final checks on resolved tool_name and tool_args - if tool_name is None: - logging.error("Could not determine tool name from input.") - print(json.dumps({ - "decision": "deny", - "reason": "Could not determine tool name from input. Expected 'name' or 'tool_name'." - })) - return - - if not isinstance(tool_args, dict): - logging.error(f"Resolved tool_args is not a dictionary: {tool_args}") - print(json.dumps({ - "decision": "deny", - "reason": "Resolved tool arguments are not in a valid dictionary format." - })) - return - - logging.debug(f"Resolved tool_name: '{tool_name}', tool_args: {tool_args}") - - # 4. Check context — if not running via Manual Slop, we pass through (allow) - # This prevents the hook from affecting normal CLI usage. - hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT") - logging.debug(f"Checking GEMINI_CLI_HOOK_CONTEXT: '{hook_context}'") - if hook_context != "manual_slop": - logging.debug(f"GEMINI_CLI_HOOK_CONTEXT is '{hook_context}', NOT 'manual_slop'. Allowing execution without confirmation.") - print(json.dumps({ - "decision": "allow", - "reason": f"Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT={hook_context})." - })) - return - - # 5. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999) - logging.debug("GEMINI_CLI_HOOK_CONTEXT is 'manual_slop'. Proceeding with API Hook Client.") - client = ApiHookClient(base_url="http://127.0.0.1:8999") - - try: - # 6. Request confirmation - # This is a blocking call that waits for the user in the GUI - logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_args}") - response = client.request_confirmation(tool_name, tool_args) - - if response and response.get('approved') is True: - # 7. Print 'allow' decision - logging.debug("User approved tool execution.") - print(json.dumps({"decision": "allow"})) - else: - # 8. Print 'deny' decision - reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.' - logging.debug(f"User denied tool execution. Reason: {reason}") - print(json.dumps({ - "decision": "deny", - "reason": reason - })) - - except Exception as e: - # 9. Handle cases where hook server is not reachable or other API errors - # If we ARE in manual_slop context but can't reach the server, we should DENY - # because the user expects to be in control. - logging.error(f"API Hook Client error: {str(e)}", exc_info=True) - print(json.dumps({ - "decision": "deny", - "reason": f"Manual Slop hook server unreachable or API error: {str(e)}" - })) - - except Exception as e: - # Fallback for unexpected errors during initial processing (e.g., stdin read) - logging.error(f"An unexpected error occurred in the main bridge logic: {str(e)}", exc_info=True) - print(json.dumps({ - "decision": "deny", - "reason": f"Internal bridge error: {str(e)}" - })) +# Setup basic logging to stderr. +# Set level to DEBUG to capture all messages, including debug info. + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr) + logging.debug("CLI Tool Bridge script started.") + try: + # 1. Read JSON from sys.stdin + input_data = sys.stdin.read() + if not input_data: + logging.debug("No input received from stdin. Exiting gracefully.") + return + logging.debug(f"Received raw input data: {input_data}") + try: + hook_input = json.loads(input_data) + except json.JSONDecodeError: + logging.error("Failed to decode JSON from stdin.") + print(json.dumps({ + "decision": "deny", + "reason": "Invalid JSON received from stdin." + })) + return + # Initialize variables for tool name and arguments + tool_name = None + tool_args = {} + # 2. Try to parse input in Gemini API format ('name', 'input') + logging.debug("Attempting to parse input in Gemini API format ('name', 'input').") + if 'name' in hook_input and hook_input['name'] is not None: + tool_name = hook_input['name'] + logging.debug(f"Found Gemini API format tool name: {tool_name}") + if 'input' in hook_input and hook_input['input'] is not None: + if isinstance(hook_input['input'], dict): + tool_args = hook_input['input'] + logging.debug(f"Found Gemini API format tool input: {tool_args}") + else: + logging.warning("Gemini API format 'input' is not a dictionary. Ignoring.") + # 3. If Gemini format wasn't fully present, try the legacy format ('tool_name', 'tool_input') + if tool_name is None: + logging.debug("Gemini API format not fully detected. Falling back to legacy format ('tool_name', 'tool_input').") + tool_name = hook_input.get('tool_name') + if tool_name: + logging.debug(f"Found legacy format tool name: {tool_name}") + tool_input_legacy = hook_input.get('tool_input') + if tool_input_legacy is not None: + if isinstance(tool_input_legacy, dict): + tool_args = tool_input_legacy + logging.debug(f"Found legacy format tool input: {tool_args}") + else: + logging.warning("Legacy format 'tool_input' is not a dictionary. Ignoring.") + # Final checks on resolved tool_name and tool_args + if tool_name is None: + logging.error("Could not determine tool name from input.") + print(json.dumps({ + "decision": "deny", + "reason": "Could not determine tool name from input. Expected 'name' or 'tool_name'." + })) + return + if not isinstance(tool_args, dict): + logging.error(f"Resolved tool_args is not a dictionary: {tool_args}") + print(json.dumps({ + "decision": "deny", + "reason": "Resolved tool arguments are not in a valid dictionary format." + })) + return + logging.debug(f"Resolved tool_name: '{tool_name}', tool_args: {tool_args}") + # 4. Check context — if not running via Manual Slop, we pass through (allow) + # This prevents the hook from affecting normal CLI usage. + hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT") + logging.debug(f"Checking GEMINI_CLI_HOOK_CONTEXT: '{hook_context}'") + if hook_context != "manual_slop": + logging.debug(f"GEMINI_CLI_HOOK_CONTEXT is '{hook_context}', NOT 'manual_slop'. Allowing execution without confirmation.") + print(json.dumps({ + "decision": "allow", + "reason": f"Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT={hook_context})." + })) + return + # 5. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999) + logging.debug("GEMINI_CLI_HOOK_CONTEXT is 'manual_slop'. Proceeding with API Hook Client.") + client = ApiHookClient(base_url="http://127.0.0.1:8999") + try: + # 6. Request confirmation + # This is a blocking call that waits for the user in the GUI + logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_args}") + response = client.request_confirmation(tool_name, tool_args) + if response and response.get('approved') is True: + # 7. Print 'allow' decision + logging.debug("User approved tool execution.") + print(json.dumps({"decision": "allow"})) + else: + # 8. Print 'deny' decision + reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.' + logging.debug(f"User denied tool execution. Reason: {reason}") + print(json.dumps({ + "decision": "deny", + "reason": reason + })) + except Exception as e: + # 9. Handle cases where hook server is not reachable or other API errors + # If we ARE in manual_slop context but can't reach the server, we should DENY + # because the user expects to be in control. + logging.error(f"API Hook Client error: {str(e)}", exc_info=True) + print(json.dumps({ + "decision": "deny", + "reason": f"Manual Slop hook server unreachable or API error: {str(e)}" + })) + except Exception as e: + # Fallback for unexpected errors during initial processing (e.g., stdin read) + logging.error(f"An unexpected error occurred in the main bridge logic: {str(e)}", exc_info=True) + print(json.dumps({ + "decision": "deny", + "reason": f"Internal bridge error: {str(e)}" + })) if __name__ == "__main__": - main() + main() diff --git a/scripts/inject_tools.py b/scripts/inject_tools.py new file mode 100644 index 0000000..5eeebed --- /dev/null +++ b/scripts/inject_tools.py @@ -0,0 +1,289 @@ +import os +import re + +with open('mcp_client.py', 'r', encoding='utf-8') as f: + content = f.read() + +# 1. Add import os if not there +if 'import os' not in content: + content = content.replace('import summarize', 'import os\nimport summarize') + +# 2. Add the functions before "# ------------------------------------------------------------------ web tools" +functions_code = r''' +def py_find_usages(path: str, name: str) -> str: + """Finds exact string matches of a symbol in a given file or directory.""" + p, err = _resolve_and_check(path) + if err: return err + try: + import re + pattern = re.compile(r"\b" + re.escape(name) + r"\b") + results = [] + def _search_file(fp): + if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return + if not _is_allowed(fp): return + try: + text = fp.read_text(encoding="utf-8") + lines = text.splitlines() + for i, line in enumerate(lines, 1): + if pattern.search(line): + rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd()) + results.append(f"{rel}:{i}: {line.strip()[:100]}") + except Exception: + pass + + if p.is_file(): + _search_file(p) + else: + for root, dirs, files in os.walk(p): + dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')] + for file in files: + if file.endswith(('.py', '.md', '.toml', '.txt', '.json')): + _search_file(Path(root) / file) + + if not results: + return f"No usages found for '{name}' in {p}" + if len(results) > 100: + return "\n".join(results[:100]) + f"\n... (and {len(results)-100} more)" + return "\n".join(results) + except Exception as e: + return f"ERROR finding usages for '{name}': {e}" + +def py_get_imports(path: str) -> str: + """Parses a file's AST and returns a strict list of its dependencies.""" + p, err = _resolve_and_check(path) + if err: return err + if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" + try: + import ast + code = p.read_text(encoding="utf-8") + tree = ast.parse(code) + imports = [] + for node in tree.body: + if isinstance(node, ast.Import): + for alias in node.names: + imports.append(alias.name) + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + for alias in node.names: + imports.append(f"{module}.{alias.name}" if module else alias.name) + if not imports: return "No imports found." + return "Imports:\n" + "\n".join(f" - {i}" for i in imports) + except Exception as e: + return f"ERROR getting imports for '{path}': {e}" + +def py_check_syntax(path: str) -> str: + """Runs a quick syntax check on a Python file.""" + p, err = _resolve_and_check(path) + if err: return err + if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" + try: + import ast + code = p.read_text(encoding="utf-8") + ast.parse(code) + return f"Syntax OK: {path}" + except SyntaxError as e: + return f"SyntaxError in {path} at line {e.lineno}, offset {e.offset}: {e.msg}\n{e.text}" + except Exception as e: + return f"ERROR checking syntax for '{path}': {e}" + +def py_get_hierarchy(path: str, class_name: str) -> str: + """Scans the project to find subclasses of a given class.""" + p, err = _resolve_and_check(path) + if err: return err + import ast + subclasses = [] + + def _search_file(fp): + if not _is_allowed(fp): return + try: + code = fp.read_text(encoding="utf-8") + tree = ast.parse(code) + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + for base in node.bases: + if isinstance(base, ast.Name) and base.id == class_name: + subclasses.append(f"{fp.name}: class {node.name}({class_name})") + elif isinstance(base, ast.Attribute) and base.attr == class_name: + subclasses.append(f"{fp.name}: class {node.name}({base.value.id}.{class_name})") + except Exception: + pass + + try: + if p.is_file(): + _search_file(p) + else: + for root, dirs, files in os.walk(p): + dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')] + for file in files: + if file.endswith('.py'): + _search_file(Path(root) / file) + + if not subclasses: + return f"No subclasses of '{class_name}' found in {p}" + return f"Subclasses of '{class_name}':\n" + "\n".join(f" - {s}" for s in subclasses) + except Exception as e: + return f"ERROR finding subclasses of '{class_name}': {e}" + +def py_get_docstring(path: str, name: str) -> str: + """Extracts the docstring for a specific module, class, or function.""" + p, err = _resolve_and_check(path) + if err: return err + if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" + try: + import ast + code = p.read_text(encoding="utf-8") + tree = ast.parse(code) + if not name or name == "module": + doc = ast.get_docstring(tree) + return doc if doc else "No module docstring found." + + node = _get_symbol_node(tree, name) + if not node: return f"ERROR: could not find symbol '{name}' in {path}" + doc = ast.get_docstring(node) + return doc if doc else f"No docstring found for '{name}'." + except Exception as e: + return f"ERROR getting docstring for '{name}': {e}" + +def get_tree(path: str, max_depth: int = 2) -> str: + """Returns a directory structure up to a max depth.""" + p, err = _resolve_and_check(path) + if err: return err + if not p.is_dir(): return f"ERROR: not a directory: {path}" + + try: + max_depth = int(max_depth) + def _build_tree(dir_path, current_depth, prefix=""): + if current_depth > max_depth: return [] + lines = [] + try: + entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) + except PermissionError: + return [] + + # Filter + entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")] + + for i, entry in enumerate(entries): + is_last = (i == len(entries) - 1) + connector = "└── " if is_last else "├── " + lines.append(f"{prefix}{connector}{entry.name}") + if entry.is_dir(): + extension = " " if is_last else "│ " + lines.extend(_build_tree(entry, current_depth + 1, prefix + extension)) + return lines + + tree_lines = [f"{p.name}/"] + _build_tree(p, 1) + return "\n".join(tree_lines) + except Exception as e: + return f"ERROR generating tree for '{path}': {e}" + +# ------------------------------------------------------------------ web tools''' + +content = content.replace('# ------------------------------------------------------------------ web tools', functions_code) + +# 3. Update TOOL_NAMES +old_tool_names_match = re.search(r'TOOL_NAMES\s*=\s*\{([^}]*)\}', content) +if old_tool_names_match: + old_names = old_tool_names_match.group(1) + new_names = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"' + content = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}') + +# 4. Update dispatch +dispatch_additions = r''' + if tool_name == "py_find_usages": + return py_find_usages(tool_input.get("path", ""), tool_input.get("name", "")) + if tool_name == "py_get_imports": + return py_get_imports(tool_input.get("path", "")) + if tool_name == "py_check_syntax": + return py_check_syntax(tool_input.get("path", "")) + if tool_name == "py_get_hierarchy": + return py_get_hierarchy(tool_input.get("path", ""), tool_input.get("class_name", "")) + if tool_name == "py_get_docstring": + return py_get_docstring(tool_input.get("path", ""), tool_input.get("name", "")) + if tool_name == "get_tree": + return get_tree(tool_input.get("path", ""), tool_input.get("max_depth", 2)) + return f"ERROR: unknown MCP tool '{tool_name}'" +''' +content = re.sub(r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content) + +# 5. Update MCP_TOOL_SPECS +mcp_tool_specs_addition = r''' + { + "name": "py_find_usages", + "description": "Finds exact string matches of a symbol in a given file or directory.", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string", "description": "Path to file or directory to search." }, + "name": { "type": "string", "description": "The symbol/string to search for." } + }, + "required": ["path", "name"] + } + }, + { + "name": "py_get_imports", + "description": "Parses a file's AST and returns a strict list of its dependencies.", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string", "description": "Path to the .py file." } + }, + "required": ["path"] + } + }, + { + "name": "py_check_syntax", + "description": "Runs a quick syntax check on a Python file.", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string", "description": "Path to the .py file." } + }, + "required": ["path"] + } + }, + { + "name": "py_get_hierarchy", + "description": "Scans the project to find subclasses of a given class.", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string", "description": "Directory path to search in." }, + "class_name": { "type": "string", "description": "Name of the base class." } + }, + "required": ["path", "class_name"] + } + }, + { + "name": "py_get_docstring", + "description": "Extracts the docstring for a specific module, class, or function.", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string", "description": "Path to the .py file." }, + "name": { "type": "string", "description": "Name of symbol or 'module' for the file docstring." } + }, + "required": ["path", "name"] + } + }, + { + "name": "get_tree", + "description": "Returns a directory structure up to a max depth.", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string", "description": "Directory path." }, + "max_depth": { "type": "integer", "description": "Maximum depth to recurse (default 2)." } + }, + "required": ["path"] + } + } +] +''' + +content = re.sub(r'\]\s*$', mcp_tool_specs_addition.strip(), content) + +with open('mcp_client.py', 'w', encoding='utf-8') as f: + f.write(content) + +print("Injected new tools.") diff --git a/scripts/mma_exec.py b/scripts/mma_exec.py index d4256a7..39c0bdd 100644 --- a/scripts/mma_exec.py +++ b/scripts/mma_exec.py @@ -11,290 +11,260 @@ import datetime LOG_FILE = 'logs/mma_delegation.log' def generate_skeleton(code: str) -> str: - """ + """ Parses Python code and replaces function/method bodies with '...', preserving docstrings if present. """ - try: - PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language()) - parser = tree_sitter.Parser(PY_LANGUAGE) - tree = parser.parse(bytes(code, "utf8")) + try: + PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language()) + parser = tree_sitter.Parser(PY_LANGUAGE) + tree = parser.parse(bytes(code, "utf8")) + edits = [] - edits = [] + def is_docstring(node): + if node.type == "expression_statement" and node.child_count > 0: + if node.children[0].type == "string": + return True + return False - def is_docstring(node): - if node.type == "expression_statement" and node.child_count > 0: - if node.children[0].type == "string": - return True - return False - - def walk(node): - if node.type == "function_definition": - body = node.child_by_field_name("body") - if body and body.type == "block": - indent = " " * body.start_point.column - first_stmt = None - for child in body.children: - if child.type != "comment": - first_stmt = child - break - - if first_stmt and is_docstring(first_stmt): - start_byte = first_stmt.end_byte - end_byte = body.end_byte - if end_byte > start_byte: - edits.append((start_byte, end_byte, f"\n{indent}...")) - else: - start_byte = body.start_byte - end_byte = body.end_byte - edits.append((start_byte, end_byte, "...")) - - for child in node.children: - walk(child) - - walk(tree.root_node) - - edits.sort(key=lambda x: x[0], reverse=True) - code_bytes = bytearray(code, "utf8") - for start, end, replacement in edits: - code_bytes[start:end] = bytes(replacement, "utf8") - - return code_bytes.decode("utf8") - except Exception as e: - return f"# Error generating skeleton: {e}\n{code}" + def walk(node): + if node.type == "function_definition": + body = node.child_by_field_name("body") + if body and body.type == "block": + indent = " " * body.start_point.column + first_stmt = None + for child in body.children: + if child.type != "comment": + first_stmt = child + break + if first_stmt and is_docstring(first_stmt): + start_byte = first_stmt.end_byte + end_byte = body.end_byte + if end_byte > start_byte: + edits.append((start_byte, end_byte, f"\n{indent}...")) + else: + start_byte = body.start_byte + end_byte = body.end_byte + edits.append((start_byte, end_byte, "...")) + for child in node.children: + walk(child) + walk(tree.root_node) + edits.sort(key=lambda x: x[0], reverse=True) + code_bytes = bytearray(code, "utf8") + for start, end, replacement in edits: + code_bytes[start:end] = bytes(replacement, "utf8") + return code_bytes.decode("utf8") + except Exception as e: + return f"# Error generating skeleton: {e}\n{code}" def get_model_for_role(role: str) -> str: - """Returns the specific model to use for a given tier role.""" - if role == 'tier1-orchestrator' or role == 'tier1': - return 'gemini-3.1-pro-preview' - elif role == 'tier2-tech-lead' or role == 'tier2': - return 'gemini-3-flash-preview' - elif role == 'tier3-worker' or role == 'tier3': - return 'gemini-3-flash-preview' - elif role == 'tier4-qa' or role == 'tier4': - return 'gemini-2.5-flash-lite' - else: - return 'gemini-2.5-flash-lite' + """Returns the specific model to use for a given tier role.""" + if role == 'tier1-orchestrator' or role == 'tier1': + return 'gemini-3.1-pro-preview' + elif role == 'tier2-tech-lead' or role == 'tier2': + return 'gemini-3-flash-preview' + elif role == 'tier3-worker' or role == 'tier3': + return 'gemini-3-flash-preview' + elif role == 'tier4-qa' or role == 'tier4': + return 'gemini-2.5-flash-lite' + else: + return 'gemini-2.5-flash-lite' def get_role_documents(role: str) -> list[str]: - if role == 'tier1-orchestrator' or role == 'tier1': - return ['conductor/product.md', 'conductor/product-guidelines.md'] - elif role == 'tier2-tech-lead' or role == 'tier2': - return ['conductor/tech-stack.md', 'conductor/workflow.md'] - elif role == 'tier3-worker' or role == 'tier3': - return ['conductor/workflow.md'] - return [] + if role == 'tier1-orchestrator' or role == 'tier1': + return ['conductor/product.md', 'conductor/product-guidelines.md'] + elif role == 'tier2-tech-lead' or role == 'tier2': + return ['conductor/tech-stack.md', 'conductor/workflow.md'] + elif role == 'tier3-worker' or role == 'tier3': + return ['conductor/workflow.md'] + return [] def log_delegation(role, full_prompt, result=None, summary_prompt=None): - os.makedirs('logs/agents', exist_ok=True) - timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') - log_file = f'logs/agents/mma_{role}_task_{timestamp}.log' - - with open(log_file, 'w', encoding='utf-8') as f: - f.write("==================================================\n") - f.write(f"ROLE: {role}\n") - f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - f.write("--------------------------------------------------\n") - f.write(f"FULL PROMPT:\n{full_prompt}\n") - f.write("--------------------------------------------------\n") - if result: - f.write(f"RESULT:\n{result}\n") - f.write("==================================================\n") - - # Also keep the master log - os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) - display_prompt = summary_prompt if summary_prompt else full_prompt - with open(LOG_FILE, 'a', encoding='utf-8') as f: - f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n") - - return log_file + os.makedirs('logs/agents', exist_ok=True) + timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + log_file = f'logs/agents/mma_{role}_task_{timestamp}.log' + with open(log_file, 'w', encoding='utf-8') as f: + f.write("==================================================\n") + f.write(f"ROLE: {role}\n") + f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write("--------------------------------------------------\n") + f.write(f"FULL PROMPT:\n{full_prompt}\n") + f.write("--------------------------------------------------\n") + if result: + f.write(f"RESULT:\n{result}\n") + f.write("==================================================\n") + # Also keep the master log + os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + display_prompt = summary_prompt if summary_prompt else full_prompt + with open(LOG_FILE, 'a', encoding='utf-8') as f: + f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n") + return log_file def get_dependencies(filepath: str) -> list[str]: - """Identify top-level module imports from a Python file.""" - try: - with open(filepath, 'r', encoding='utf-8') as f: - tree = ast.parse(f.read()) - dependencies = [] - for node in tree.body: - if isinstance(node, ast.Import): - for alias in node.names: - dependencies.append(alias.name.split('.')[0]) - elif isinstance(node, ast.ImportFrom): - if node.module: - dependencies.append(node.module.split('.')[0]) - seen = set() - result = [] - for d in dependencies: - if d not in seen: - result.append(d) - seen.add(d) - return result - except Exception as e: - print(f"Error getting dependencies for {filepath}: {e}") - return [] + """Identify top-level module imports from a Python file.""" + try: + with open(filepath, 'r', encoding='utf-8') as f: + tree = ast.parse(f.read()) + dependencies = [] + for node in tree.body: + if isinstance(node, ast.Import): + for alias in node.names: + dependencies.append(alias.name.split('.')[0]) + elif isinstance(node, ast.ImportFrom): + if node.module: + dependencies.append(node.module.split('.')[0]) + seen = set() + result = [] + for d in dependencies: + if d not in seen: + result.append(d) + seen.add(d) + return result + except Exception as e: + print(f"Error getting dependencies for {filepath}: {e}") + return [] def execute_agent(role: str, prompt: str, docs: list[str]) -> str: - model = get_model_for_role(role) - - # Advanced Context: Dependency skeletons for Tier 3 - injected_context = "" - # Whitelist of modules that sub-agents have "unfettered" (full) access to. - # These will be provided in full if imported, instead of just skeletons. - UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate'] - - if role in ['tier3', 'tier3-worker']: - for doc in docs: - if doc.endswith('.py') and os.path.exists(doc): - deps = get_dependencies(doc) - for dep in deps: - # Only try to generate skeletons for files that exist in the local dir - dep_file = f"{dep}.py" - - # Optimization: If the dependency is already in 'docs' (explicitly provided), - # do NOT inject its skeleton/full context again as a dependency. - if dep_file in docs: - continue - - if os.path.exists(dep_file) and dep_file != doc: - try: - if dep in UNFETTERED_MODULES: - with open(dep_file, 'r', encoding='utf-8') as f: - full_content = f.read() - injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n" - else: - with open(dep_file, 'r', encoding='utf-8') as f: - skeleton = generate_skeleton(f.read()) - injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n" - except Exception as e: - print(f"Error gathering context for {dep_file}: {e}") - - # Check for token-bloat safety: if injected_context is too large, truncate it - if len(injected_context) > 15000: - injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" - - # MMA Protocol: Tier 3 and 4 are stateless. - if role in ['tier3', 'tier3-worker']: - system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " \ - "Your goal is to implement specific code changes or tests based on the provided task. " \ - "You have access to tools for reading and writing files (e.g., read_file, write_file, replace), " \ - "codebase investigation (discovered_tool_get_code_outline, discovered_tool_get_python_skeleton), " \ - "version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \ - "You CAN execute PowerShell scripts via discovered_tool_run_powershell for verification and testing. " \ - "Follow TDD and return success status or code changes. No pleasantries, no conversational filler." - elif role in ['tier4', 'tier4-qa']: - system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " \ - "Your goal is to analyze errors, summarize logs, or verify tests. " \ - "You have access to tools for reading files, exploring the codebase (discovered_tool_get_code_outline, discovered_tool_get_python_skeleton), " \ - "version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \ - "You CAN execute PowerShell scripts via discovered_tool_run_powershell for diagnostics. " \ - "ONLY output the requested analysis. No pleasantries." - else: - system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \ - "ONLY output the requested text. No pleasantries." - - command_text = f"{system_directive}\n\n{injected_context}\n\n" - - # Manually inline documents to ensure sub-agent has context in headless mode - for doc in docs: - if os.path.exists(doc): - try: - with open(doc, 'r', encoding='utf-8') as f: - content = f.read() - command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n" - except Exception as e: - print(f"Error inlining {doc}: {e}") - - command_text += f"\n\nTASK: {prompt}\n\n" - - # Use subprocess with input to pipe the prompt via stdin, avoiding WinError 206. - # We use -p 'mma_task' to ensure non-interactive (headless) mode and valid parsing. - # Whitelist tools to ensure they are available to the model in headless mode. - allowed_tools = "read_file,write_file,replace,list_directory,glob,grep_search,discovered_tool_search_files,discovered_tool_get_file_summary,discovered_tool_get_python_skeleton,discovered_tool_get_code_outline,discovered_tool_get_git_diff,discovered_tool_run_powershell,activate_skill,codebase_investigator,discovered_tool_web_search,discovered_tool_fetch_url" - ps_command = ( - f"if (Test-Path 'C:\\projects\\misc\\setup_gemini.ps1') {{ . 'C:\\projects\\misc\\setup_gemini.ps1' }}; " - f"gemini -p 'mma_task' --allowed-tools {allowed_tools} --output-format json --model {model}" - ) - cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command] - - - try: - process = subprocess.run(cmd, input=command_text, capture_output=True, text=True, encoding='utf-8') - - result = process.stdout - if not process.stdout and process.stderr: - result = f"Error: {process.stderr}" - - # Log the attempt and result - log_file = log_delegation(role, command_text, result, summary_prompt=prompt) - print(f"Sub-agent log created: {log_file}") - - stdout = process.stdout - start_index = stdout.find('{') - if start_index != -1: - json_str = stdout[start_index:] - try: - data = json.loads(json_str) - return data.get('response', stdout) - except json.JSONDecodeError: - return stdout - return stdout - except Exception as e: - err_msg = f"Execution failed: {str(e)}" - log_delegation(role, command_text, err_msg) - return err_msg + model = get_model_for_role(role) + # Advanced Context: Dependency skeletons for Tier 3 + injected_context = "" + # Whitelist of modules that sub-agents have "unfettered" (full) access to. + # These will be provided in full if imported, instead of just skeletons. + UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate'] + if role in ['tier3', 'tier3-worker']: + for doc in docs: + if doc.endswith('.py') and os.path.exists(doc): + deps = get_dependencies(doc) + for dep in deps: + # Only try to generate skeletons for files that exist in the local dir + dep_file = f"{dep}.py" + # Optimization: If the dependency is already in 'docs' (explicitly provided), + # do NOT inject its skeleton/full context again as a dependency. + if dep_file in docs: + continue + if os.path.exists(dep_file) and dep_file != doc: + try: + if dep in UNFETTERED_MODULES: + with open(dep_file, 'r', encoding='utf-8') as f: + full_content = f.read() + injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n" + else: + with open(dep_file, 'r', encoding='utf-8') as f: + skeleton = generate_skeleton(f.read()) + injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n" + except Exception as e: + print(f"Error gathering context for {dep_file}: {e}") + # Check for token-bloat safety: if injected_context is too large, truncate it + if len(injected_context) > 15000: + injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" + # MMA Protocol: Tier 3 and 4 are stateless. + if role in ['tier3', 'tier3-worker']: + system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " \ + "Your goal is to implement specific code changes or tests based on the provided task. " \ + "You have access to tools for reading and writing files (e.g., read_file, write_file, replace), " \ + "codebase investigation (discovered_tool_py_get_code_outline, discovered_tool_py_get_skeleton, discovered_tool_py_find_usages, discovered_tool_py_get_imports, discovered_tool_py_check_syntax, discovered_tool_get_tree), " \ + "version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \ + "You CAN execute PowerShell scripts via discovered_tool_run_powershell for verification and testing. " \ + "Follow TDD and return success status or code changes. No pleasantries, no conversational filler." + elif role in ['tier4', 'tier4-qa']: + system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " \ + "Your goal is to analyze errors, summarize logs, or verify tests. " \ + "You have access to tools for reading files, exploring the codebase (discovered_tool_py_get_code_outline, discovered_tool_py_get_skeleton, discovered_tool_py_find_usages, discovered_tool_py_get_imports), " \ + "version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \ + "You CAN execute PowerShell scripts via discovered_tool_run_powershell for diagnostics. " \ + "ONLY output the requested analysis. No pleasantries." + else: + system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \ + "ONLY output the requested text. No pleasantries." + command_text = f"{system_directive}\n\n{injected_context}\n\n" + # Manually inline documents to ensure sub-agent has context in headless mode + for doc in docs: + if os.path.exists(doc): + try: + with open(doc, 'r', encoding='utf-8') as f: + content = f.read() + command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n" + except Exception as e: + print(f"Error inlining {doc}: {e}") + command_text += f"\n\nTASK: {prompt}\n\n" + # Use subprocess with input to pipe the prompt via stdin, avoiding WinError 206. + # We use -p 'mma_task' to ensure non-interactive (headless) mode and valid parsing. + # Whitelist tools to ensure they are available to the model in headless mode. + allowed_tools = "read_file,write_file,replace,list_directory,glob,grep_search,discovered_tool_search_files,discovered_tool_get_file_summary,discovered_tool_py_get_skeleton,discovered_tool_py_get_code_outline,discovered_tool_py_get_definition,discovered_tool_py_update_definition,discovered_tool_py_get_signature,discovered_tool_py_set_signature,discovered_tool_py_get_class_summary,discovered_tool_py_get_var_declaration,discovered_tool_py_set_var_declaration,discovered_tool_get_git_diff,discovered_tool_run_powershell,activate_skill,codebase_investigator,discovered_tool_web_search,discovered_tool_fetch_url,discovered_tool_py_find_usages,discovered_tool_py_get_imports,discovered_tool_py_check_syntax,discovered_tool_py_get_hierarchy,discovered_tool_py_get_docstring,discovered_tool_get_tree" + ps_command = ( + f"if (Test-Path 'C:\\projects\\misc\\setup_gemini.ps1') {{ . 'C:\\projects\\misc\\setup_gemini.ps1' }}; " + f"gemini -p 'mma_task' --allowed-tools {allowed_tools} --output-format json --model {model}" + ) + cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command] + try: + process = subprocess.run(cmd, input=command_text, capture_output=True, text=True, encoding='utf-8') + result = process.stdout + if not process.stdout and process.stderr: + result = f"Error: {process.stderr}" + # Log the attempt and result + log_file = log_delegation(role, command_text, result, summary_prompt=prompt) + print(f"Sub-agent log created: {log_file}") + stdout = process.stdout + start_index = stdout.find('{') + if start_index != -1: + json_str = stdout[start_index:] + try: + data = json.loads(json_str) + return data.get('response', stdout) + except json.JSONDecodeError: + return stdout + return stdout + except Exception as e: + err_msg = f"Execution failed: {str(e)}" + log_delegation(role, command_text, err_msg) + return err_msg def create_parser(): - parser = argparse.ArgumentParser(description="MMA Execution Script") - parser.add_argument( - "--role", - choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'], - help="The tier role to execute" - ) - parser.add_argument( - "--task-file", - type=str, - help="TOML file defining the task" - ) - parser.add_argument( - "prompt", - type=str, - nargs='?', - help="The prompt for the tier (optional if --task-file is used)" - ) - return parser + parser = argparse.ArgumentParser(description="MMA Execution Script") + parser.add_argument( + "--role", + choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'], + help="The tier role to execute" + ) + parser.add_argument( + "--task-file", + type=str, + help="TOML file defining the task" + ) + parser.add_argument( + "prompt", + type=str, + nargs='?', + help="The prompt for the tier (optional if --task-file is used)" + ) + return parser def main(): - parser = create_parser() - args = parser.parse_args() - - role = args.role - prompt = args.prompt - docs = [] - - if args.task_file and os.path.exists(args.task_file): - with open(args.task_file, "rb") as f: - task_data = tomllib.load(f) - role = task_data.get("role", role) - prompt = task_data.get("prompt", prompt) - docs = task_data.get("docs", []) - - if not role or not prompt: - parser.print_help() - return - - if not docs: - docs = get_role_documents(role) - - # Extract @file references from the prompt - import re - file_refs = re.findall(r"@([\w./\\]+)", prompt) - for ref in file_refs: - if os.path.exists(ref) and ref not in docs: - docs.append(ref) - - print(f"Executing role: {role} with docs: {docs}") - result = execute_agent(role, prompt, docs) - print(result) + parser = create_parser() + args = parser.parse_args() + role = args.role + prompt = args.prompt + docs = [] + if args.task_file and os.path.exists(args.task_file): + with open(args.task_file, "rb") as f: + task_data = tomllib.load(f) + role = task_data.get("role", role) + prompt = task_data.get("prompt", prompt) + docs = task_data.get("docs", []) + if not role or not prompt: + parser.print_help() + return + if not docs: + docs = get_role_documents(role) + # Extract @file references from the prompt + import re + file_refs = re.findall(r"@([\w./\\]+)", prompt) + for ref in file_refs: + if os.path.exists(ref) and ref not in docs: + docs.append(ref) + print(f"Executing role: {role} with docs: {docs}") + result = execute_agent(role, prompt, docs) + print(result) if __name__ == "__main__": - main() + main() diff --git a/scripts/slice_tools.py b/scripts/slice_tools.py new file mode 100644 index 0000000..7aa89e0 --- /dev/null +++ b/scripts/slice_tools.py @@ -0,0 +1,64 @@ +import sys +import ast + +def get_slice(filepath, start_line, end_line): + with open(filepath, 'r', encoding='utf-8') as f: + lines = f.readlines() + start_idx = int(start_line) - 1 + end_idx = int(end_line) + return "".join(lines[start_idx:end_idx]) + +def set_slice(filepath, start_line, end_line, new_content): + with open(filepath, 'r', encoding='utf-8') as f: + lines = f.readlines() + start_idx = int(start_line) - 1 + end_idx = int(end_line) + if new_content and not new_content.endswith(chr(10)): + new_content += chr(10) + new_lines = new_content.splitlines(True) if new_content else [] + lines[start_idx:end_idx] = new_lines + with open(filepath, 'w', encoding='utf-8', newline='') as f: + f.writelines(lines) + +def get_def(filepath, symbol_name): + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + tree = ast.parse(content) + for node in ast.walk(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + if node.name == symbol_name: + start = node.lineno + end = node.end_lineno + if node.decorator_list: + start = node.decorator_list[0].lineno + slice_content = get_slice(filepath, start, end) + return f"{start},{end}{chr(10)}{slice_content}" + return "NOT_FOUND" + +def set_def(filepath, symbol_name, new_content): + res = get_def(filepath, symbol_name) + if res == "NOT_FOUND": + print(f"Error: Symbol '{symbol_name}' not found in {filepath}") + sys.exit(1) + lines_info = res.split(chr(10), 1)[0] + start, end = lines_info.split(',') + set_slice(filepath, start, end, new_content) + print(f"Successfully updated '{symbol_name}' (lines {start}-{end}) in {filepath}") + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python slice_tools.py [args...]") + sys.exit(1) + cmd = sys.argv[1] + if cmd == "get_slice": + print(get_slice(sys.argv[2], sys.argv[3], sys.argv[4]), end="") + elif cmd == "set_slice": + with open(sys.argv[5], 'r', encoding='utf-8') as f: + new_content = f.read() + set_slice(sys.argv[2], sys.argv[3], sys.argv[4], new_content) + elif cmd == "get_def": + print(get_def(sys.argv[2], sys.argv[3]), end="") + elif cmd == "set_def": + with open(sys.argv[4], 'r', encoding='utf-8') as f: + new_content = f.read() + set_def(sys.argv[2], sys.argv[3], new_content) diff --git a/scripts/temp_def.py b/scripts/temp_def.py new file mode 100644 index 0000000..35518fb --- /dev/null +++ b/scripts/temp_def.py @@ -0,0 +1,97 @@ +def format_code(source: str) -> str: + """ + Formats Python code to use exactly 1 space for indentation (including continuations), + max 1 blank line between top-level definitions, and 0 blank lines inside + function/method bodies. + + Args: + source: The Python source code to format. + + Returns: + The formatted source code. + """ + if not source: + return "" + tokens = list(tokenize.generate_tokens(io.StringIO(source).readline)) + lines = source.splitlines(keepends=True) + num_lines = len(lines) + block_level = 0 + paren_level = 0 + in_function_stack = [] + expecting_function_indent = False + line_indent = {} + line_is_blank = {i: True for i in range(1, num_lines + 2)} + line_is_string_interior = {i: False for i in range(1, num_lines + 2)} + line_seen = set() + pending_blank_lines = [] + for tok in tokens: + t_type = tok.type + t_string = tok.string + start_line, _ = tok.start + end_line, _ = tok.end + if t_type == tokenize.STRING: + for l in range(start_line + 1, end_line + 1): + line_is_string_interior[l] = True + if t_type not in (tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT, tokenize.ENDMARKER): + for l in range(start_line, end_line + 1): + line_is_blank[l] = False + pending_blank_lines = [] # Real content seen, clear pending blanks + # State updates that affect CURRENT line + if t_type == tokenize.INDENT: + block_level += 1 + if expecting_function_indent: + in_function_stack.append(block_level) + expecting_function_indent = False + elif t_type == tokenize.DEDENT: + block_level -= 1 + if in_function_stack and block_level < in_function_stack[-1]: + in_function_stack.pop() + # Retroactively update pending blank lines to the current (outer) level + for l in pending_blank_lines: + line_indent[l] = block_level + paren_level + if t_string in (')', ']', '}'): + paren_level -= 1 + if start_line not in line_seen: + line_indent[start_line] = block_level + paren_level + if t_type not in (tokenize.INDENT, tokenize.DEDENT): + line_seen.add(start_line) + if t_type in (tokenize.NL, tokenize.NEWLINE): + pending_blank_lines.append(start_line) + # State updates that affect FUTURE lines/tokens + if t_type == tokenize.NAME and t_string == 'def': + expecting_function_indent = True + if t_string in ('(', '[', '{'): + paren_level += 1 + output = [] + consecutive_blanks = 0 + for i in range(1, num_lines + 1): + if line_is_string_interior[i]: + output.append(lines[i-1]) + continue + if line_is_blank[i]: + indent = line_indent.get(i, 0) + if indent > 0: + continue + else: + if consecutive_blanks < 1: + output.append("\n") + consecutive_blanks += 1 + continue + original_line = lines[i-1] + indent = line_indent.get(i, 0) + stripped = original_line.lstrip() + # Enforce a 1-line gap before definitions/classes + is_def_start = stripped.startswith(('def ', 'class ', 'async def ', '@')) + if is_def_start and output and consecutive_blanks == 0: + prev_line = output[-1].strip() + # Don't add a gap if immediately following a block opener or another decorator + if prev_line and not prev_line.endswith(':') and not prev_line.startswith('@'): + output.append("\n") + consecutive_blanks += 1 + consecutive_blanks = 0 + output.append(" " * indent + stripped) + if not stripped.endswith('\n') and i < num_lines: + output[-1] += '\n' + if output and not output[-1].endswith('\n'): + output[-1] += '\n' + return "".join(output) diff --git a/scripts/tool_call.py b/scripts/tool_call.py index 5263131..0ef4c00 100644 --- a/scripts/tool_call.py +++ b/scripts/tool_call.py @@ -11,44 +11,40 @@ sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) try: - import mcp_client - import shell_runner + import mcp_client + import shell_runner except ImportError: - print(json.dumps({"error": "Failed to import required modules"})) - sys.exit(1) + print(json.dumps({"error": "Failed to import required modules"})) + sys.exit(1) def main(): - if len(sys.argv) < 2: - print(json.dumps({"error": "No tool name provided"})) - sys.exit(1) - - tool_name = sys.argv[1] - - # Read arguments from stdin - try: - input_data = sys.stdin.read() - if input_data: - tool_input = json.loads(input_data) - else: - tool_input = {} - except json.JSONDecodeError: - print(json.dumps({"error": "Invalid JSON input"})) - sys.exit(1) - - try: - if tool_name == "run_powershell": - script = tool_input.get("script", "") - # We use the current directory as base_dir for CLI calls - result = shell_runner.run_powershell(script, os.getcwd()) - else: - # mcp_client tools generally resolve paths relative to CWD if not configured. - result = mcp_client.dispatch(tool_name, tool_input) - - # We print the raw result string as that's what gemini-cli expects. - print(result) - except Exception as e: - print(f"ERROR executing tool {tool_name}: {e}") - sys.exit(1) + if len(sys.argv) < 2: + print(json.dumps({"error": "No tool name provided"})) + sys.exit(1) + tool_name = sys.argv[1] + # Read arguments from stdin + try: + input_data = sys.stdin.read() + if input_data: + tool_input = json.loads(input_data) + else: + tool_input = {} + except json.JSONDecodeError: + print(json.dumps({"error": "Invalid JSON input"})) + sys.exit(1) + try: + if tool_name == "run_powershell": + script = tool_input.get("script", "") + # We use the current directory as base_dir for CLI calls + result = shell_runner.run_powershell(script, os.getcwd()) + else: + # mcp_client tools generally resolve paths relative to CWD if not configured. + result = mcp_client.dispatch(tool_name, tool_input) + # We print the raw result string as that's what gemini-cli expects. + print(result) + except Exception as e: + print(f"ERROR executing tool {tool_name}: {e}") + sys.exit(1) if __name__ == "__main__": - main() + main() diff --git a/scripts/tool_discovery.py b/scripts/tool_discovery.py index 1d3c09f..8a8945a 100644 --- a/scripts/tool_discovery.py +++ b/scripts/tool_discovery.py @@ -6,44 +6,41 @@ import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) try: - import mcp_client + import mcp_client except ImportError as e: - # Print the error to stderr to diagnose - print(f"ImportError in discovery: {e}", file=sys.stderr) - print("[]") - sys.exit(0) +# Print the error to stderr to diagnose + print(f"ImportError in discovery: {e}", file=sys.stderr) + print("[]") + sys.exit(0) def main(): - specs = list(mcp_client.MCP_TOOL_SPECS) - - # Add run_powershell (manually define to match ai_client.py) - specs.append({ - "name": "run_powershell", - "description": ( - "Run a PowerShell script within the project base_dir. " - "Use this to create, edit, rename, or delete files and directories. " - "The working directory is set to base_dir automatically. " - "stdout and stderr are returned to you as the result." - ), - "parameters": { - "type": "object", - "properties": { - "script": { - "type": "string", - "description": "The PowerShell script to execute." - } - }, - "required": ["script"] - } - }) - - # Rename 'parameters' to 'parametersJsonSchema' for Gemini CLI - for spec in specs: - if "parameters" in spec: - spec["parametersJsonSchema"] = spec.pop("parameters") - - # Output as JSON array of FunctionDeclarations - print(json.dumps(specs, indent=2)) + specs = list(mcp_client.MCP_TOOL_SPECS) + # Add run_powershell (manually define to match ai_client.py) + specs.append({ + "name": "run_powershell", + "description": ( + "Run a PowerShell script within the project base_dir. " + "Use this to create, edit, rename, or delete files and directories. " + "The working directory is set to base_dir automatically. " + "stdout and stderr are returned to you as the result." + ), + "parameters": { + "type": "object", + "properties": { + "script": { + "type": "string", + "description": "The PowerShell script to execute." + } + }, + "required": ["script"] + } + }) + # Rename 'parameters' to 'parametersJsonSchema' for Gemini CLI + for spec in specs: + if "parameters" in spec: + spec["parametersJsonSchema"] = spec.pop("parameters") + # Output as JSON array of FunctionDeclarations + print(json.dumps(specs, indent=2)) if __name__ == "__main__": - main() + main() diff --git a/scripts/type_hint_scanner.py b/scripts/type_hint_scanner.py index 4c0bfff..20ba7cc 100644 --- a/scripts/type_hint_scanner.py +++ b/scripts/type_hint_scanner.py @@ -4,7 +4,6 @@ import sys def get_missing_hints(file_path: str): with open(file_path, "r", encoding="utf-8") as f: tree = ast.parse(f.read()) - missing = [] for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): @@ -15,7 +14,7 @@ def get_missing_hints(file_path: str): "name": node.name, "lineno": node.lineno, "col_offset": node.col_offset - }) + }) return missing if __name__ == "__main__":