from __future__ import annotations import io import json import logging from datetime import date from decimal import Decimal import pytest from quartermaster import month_service, service from quartermaster.logging_config import LOG_CONFIG from quartermaster.models import Section def _build_formatter(): """Instantiate the JSON formatter from LOG_CONFIG for direct testing.""" from pydoc import locate cfg = dict(LOG_CONFIG["formatters"]["json"]) factory_path = cfg.pop("()") factory = locate(factory_path) assert factory is not None, f"formatter factory not importable: {factory_path}" return factory(**cfg) def test_log_config_emits_json_with_required_fields(): formatter = _build_formatter() stream = io.StringIO() handler = logging.StreamHandler(stream) handler.setFormatter(formatter) logger = logging.getLogger("tests.logging_smoke") logger.handlers = [handler] logger.propagate = False logger.setLevel(logging.INFO) logger.info("smoke message", extra={"event": "smoke"}) line = stream.getvalue().strip() assert line, "formatter produced no output" payload = json.loads(line) assert payload["event"] == "smoke" assert payload["level"] == "INFO" assert payload["logger"] == "tests.logging_smoke" assert payload["message"] == "smoke message" assert "timestamp" in payload def test_access_log_filter_enriches_record(): from quartermaster.logging_config import AccessLogFilter record = logging.LogRecord( name="uvicorn.access", level=logging.INFO, pathname=__file__, lineno=0, msg='%s - "%s %s HTTP/%s" %d', args=("127.0.0.1:1234", "GET", "/healthz", "1.1", 200), exc_info=None, ) assert AccessLogFilter().filter(record) is True assert record.event == "http_request" assert record.method == "GET" assert record.path == "/healthz" assert record.status == 200 assert record.client_ip == "127.0.0.1:1234" def test_access_log_filter_passes_through_non_access_records(): """Non-access records (args tuple shape mismatch) are kept, unmodified.""" from quartermaster.logging_config import AccessLogFilter record = logging.LogRecord( name="quartermaster", level=logging.INFO, pathname=__file__, lineno=0, msg="regular app log", args=None, exc_info=None, ) assert AccessLogFilter().filter(record) is True assert not hasattr(record, "event") or record.event is None assert not hasattr(record, "method") or record.method is None def test_log_config_loads_via_dictconfig(): """Verify the whole LOG_CONFIG passes logging.config.dictConfig without error.""" import logging.config from quartermaster.logging_config import LOG_CONFIG logging.config.dictConfig(LOG_CONFIG) @pytest.fixture(autouse=True) def _reset_quartermaster_logger(): """Ensure the quartermaster logger propagates to root so caplog can capture records. logging.config.dictConfig (called in test_log_config_loads_via_dictconfig) sets propagate=False on the quartermaster logger. caplog injects its handler into the root logger, so records only reach it when propagation is enabled. This fixture temporarily re-enables propagation for every test in this module and restores the original value on teardown. """ qm_logger = logging.getLogger("quartermaster") original_propagate = qm_logger.propagate qm_logger.propagate = True yield qm_logger.propagate = original_propagate def _make_debt_minimum(db, name="Loan", amount=Decimal("100.00")): from quartermaster.models import Entry entry = Entry(section=Section.debt_minimum, name=name, amount=amount) db.add(entry) db.commit() db.refresh(entry) return entry def test_create_month_logs_month_created_event(db, caplog): caplog.set_level(logging.INFO, logger="quartermaster") month_service.create_month(db, "2026-05") events = [r for r in caplog.records if getattr(r, "event", None) == "month_created"] assert len(events) == 1 assert events[0].levelname == "INFO" def test_close_month_logs_month_closed_event(db, caplog): month = month_service.create_month(db, "2026-06") # move through active → closed with a zero-applied budget (no entries yet) month_service.activate_month(db, month) caplog.clear() caplog.set_level(logging.INFO, logger="quartermaster") month_service.close_month(db, month) events = [r for r in caplog.records if getattr(r, "event", None) == "month_closed"] assert len(events) == 1 def test_add_posting_logs_posting_added_event(db, caplog): _make_debt_minimum(db) month = month_service.create_month(db, "2026-07") entry = month.entries[0] caplog.set_level(logging.INFO, logger="quartermaster") month_service.add_posting( db, month, entry.id, occurred_on=date(2026, 7, 15), amount=Decimal("25.00"), ) events = [r for r in caplog.records if getattr(r, "event", None) == "posting_added"] assert len(events) == 1 def test_delete_posting_logs_posting_deleted_event(db, caplog): _make_debt_minimum(db) month = month_service.create_month(db, "2026-08") entry = month.entries[0] posting = month_service.add_posting( db, month, entry.id, occurred_on=date(2026, 8, 15), amount=Decimal("10.00"), ) caplog.clear() caplog.set_level(logging.INFO, logger="quartermaster") month_service.delete_posting(db, month, posting.id) events = [r for r in caplog.records if getattr(r, "event", None) == "posting_deleted"] assert len(events) == 1