fix(tests): resolve 3 pre-existing test failures surfaced by user's batched run
The phase2_4_5_call_site_completion_20260621 track's end-of-track report
documented 5 pre-existing tier-1-unit-core failures as 'not caused by
this track' and deferred them to a future track. The user explicitly
called this out as a process mistake - even pre-existing failures must
be fixed for the track to be 'done'.
Fixed 3 of 5 (the other 2 are sandbox-pollution audit_tier2_leaks tests
that require infrastructure changes):
1. test_logging_e2e::test_logging_e2e ('Session' object does not support
item assignment): Phase 4 of the parent track migrated LogRegistry
data from dict to frozen Session dataclass; test_logging_e2e.py was
missed in the migration. Fix: add LogRegistry.set_session_start_time()
method (mirrors update_session_metadata's pattern of replacing the
frozen Session with a new one); update test to use the new method.
2. test_no_temp_writes::test_no_script_emits_to_temp (scripts/generate_type_registry.py
uses tempfile): The --check mode was using tempfile.TemporaryDirectory
which the audit forbids. Fix: refactor --check mode to use a path
under tests/artifacts/_type_registry_check/ instead (cleaned up in
a finally block).
3. test_gui2_parity::test_gui2_custom_callback_hook_works (custom
callback not executed within 1.5s): The test used time.sleep(1.5) +
assert, the documented race condition anti-pattern. Fix: replace
with a 10s poll loop that waits for the file to exist AND have the
correct content (per workflow's polling pattern guidance).
Verification: tier-1-unit-core now has only 3 remaining failures, all
are pre-existing test_audit_tier2_leaks sandbox-pollution tests
(deferred to infrastructure track per metadata.json).
This commit is contained in:
@@ -241,23 +241,28 @@ def main() -> int:
|
||||
write_registry(src, out)
|
||||
print(f"Generated {len(list(out.rglob('*.md')))} .md files in {out}")
|
||||
return 0
|
||||
import tempfile
|
||||
import shutil
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
tmp_out = Path(tmp) / "registry"
|
||||
write_registry(src, tmp_out)
|
||||
check_tmp = Path("tests/artifacts/_type_registry_check") / "registry"
|
||||
if check_tmp.exists():
|
||||
shutil.rmtree(check_tmp)
|
||||
check_tmp.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
write_registry(src, check_tmp)
|
||||
drift = []
|
||||
for orig in out.rglob("*.md"):
|
||||
new = tmp_out / orig.relative_to(out)
|
||||
new = check_tmp / orig.relative_to(out)
|
||||
if not new.exists():
|
||||
drift.append(f"DELETED: {orig.relative_to(out)}")
|
||||
continue
|
||||
if orig.read_text(encoding="utf-8") != new.read_text(encoding="utf-8"):
|
||||
drift.append(f"MODIFIED: {orig.relative_to(out)}")
|
||||
for new in tmp_out.rglob("*.md"):
|
||||
orig = out / new.relative_to(tmp_out)
|
||||
for new in check_tmp.rglob("*.md"):
|
||||
orig = out / new.relative_to(check_tmp)
|
||||
if not orig.exists():
|
||||
drift.append(f"ADDED: {new.relative_to(tmp_out)}")
|
||||
drift.append(f"ADDED: {new.relative_to(check_tmp)}")
|
||||
finally:
|
||||
if check_tmp.exists():
|
||||
shutil.rmtree(check_tmp)
|
||||
if drift:
|
||||
print(f"DRIFT detected ({len(drift)} files differ):", file=sys.stderr)
|
||||
for d in drift:
|
||||
|
||||
@@ -280,6 +280,36 @@ class LogRegistry:
|
||||
)
|
||||
self.save_registry() # Save after update
|
||||
|
||||
def set_session_start_time(self, session_id: str, start_time: datetime | str) -> None:
|
||||
"""
|
||||
Updates the start_time of an existing session.
|
||||
|
||||
Used by tests and maintenance tools to backdate a session for pruning
|
||||
verification. Creates a new Session with the updated start_time while
|
||||
preserving all other fields (Session is frozen).
|
||||
|
||||
Args:
|
||||
session_id (str): Unique identifier for the session.
|
||||
start_time (datetime|str): The new start timestamp.
|
||||
[C: tests/test_logging_e2e.py:test_logging_e2e]
|
||||
"""
|
||||
if session_id not in self.data:
|
||||
print(f"Error: Session ID '{session_id}' not found for start_time update.")
|
||||
return
|
||||
if isinstance(start_time, datetime):
|
||||
start_time_str: str = start_time.isoformat()
|
||||
else:
|
||||
start_time_str = start_time
|
||||
existing = self.data[session_id]
|
||||
self.data[session_id] = Session(
|
||||
session_id=existing.session_id,
|
||||
path=existing.path,
|
||||
start_time=start_time_str,
|
||||
whitelisted=existing.whitelisted,
|
||||
metadata=existing.metadata,
|
||||
)
|
||||
self.save_registry()
|
||||
|
||||
def is_session_whitelisted(self, session_id: str) -> bool:
|
||||
"""
|
||||
Checks if a specific session is marked as whitelisted.
|
||||
|
||||
@@ -58,12 +58,12 @@ def test_gui2_click_hook_works(live_gui: Any) -> None:
|
||||
time.sleep(1.5)
|
||||
# Verify it was reset
|
||||
assert client.get_value('ai_input') == ""
|
||||
|
||||
def test_gui2_custom_callback_hook_works(live_gui: Any) -> None:
|
||||
"""
|
||||
|
||||
|
||||
Tests that the 'custom_callback' GUI hook is correctly implemented.
|
||||
|
||||
Tests that the 'custom_callback' GUI hook is correctly implemented.
|
||||
"""
|
||||
client = ApiHookClient()
|
||||
assert client.wait_for_server(timeout=10)
|
||||
@@ -75,9 +75,15 @@ def test_gui2_custom_callback_hook_works(live_gui: Any) -> None:
|
||||
}
|
||||
response = client.post_gui(gui_data)
|
||||
assert response == {'status': 'queued'}
|
||||
time.sleep(1.5) # Give gui_2.py time to process its task queue
|
||||
# Assert that the file WAS created and contains the correct data
|
||||
# Poll for the callback to complete (avoids time.sleep race; per workflow anti-pattern)
|
||||
temp_workspace_file = Path('tests/artifacts/temp_callback_output.txt')
|
||||
deadline = time.time() + 10.0
|
||||
while time.time() < deadline:
|
||||
if temp_workspace_file.exists():
|
||||
content = temp_workspace_file.read_text(encoding="utf-8")
|
||||
if content == test_data:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
assert temp_workspace_file.exists(), f"Custom callback was NOT executed, or file path is wrong! Expected: {temp_workspace_file}"
|
||||
with open(temp_workspace_file, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
@@ -56,7 +56,6 @@ def test_logging_e2e(e2e_setup: Any) -> None:
|
||||
assert session_dir.exists(), "New whitelisted session should have been kept"
|
||||
# Extra check: Whitelisted sessions should be kept even if old
|
||||
# Manually backdate the current session
|
||||
registry.data[session_id]['start_time'] = (datetime.now() - timedelta(days=2)).isoformat()
|
||||
registry.save_registry()
|
||||
registry.set_session_start_time(session_id, datetime.now() - timedelta(days=2))
|
||||
pruner.prune()
|
||||
assert session_dir.exists(), "Whitelisted session should be kept even if it is old and small"
|
||||
|
||||
Reference in New Issue
Block a user