quartermaster/tests/test_logging.py

190 lines
5.7 KiB
Python

from __future__ import annotations
import io
import json
import logging
from datetime import date
from decimal import Decimal
import pytest
from quartermaster import month_service, service
from quartermaster.logging_config import LOG_CONFIG
from quartermaster.models import Section
def _build_formatter():
"""Instantiate the JSON formatter from LOG_CONFIG for direct testing."""
from pydoc import locate
cfg = dict(LOG_CONFIG["formatters"]["json"])
factory_path = cfg.pop("()")
factory = locate(factory_path)
assert factory is not None, f"formatter factory not importable: {factory_path}"
return factory(**cfg)
def test_log_config_emits_json_with_required_fields():
formatter = _build_formatter()
stream = io.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(formatter)
logger = logging.getLogger("tests.logging_smoke")
logger.handlers = [handler]
logger.propagate = False
logger.setLevel(logging.INFO)
logger.info("smoke message", extra={"event": "smoke"})
line = stream.getvalue().strip()
assert line, "formatter produced no output"
payload = json.loads(line)
assert payload["event"] == "smoke"
assert payload["level"] == "INFO"
assert payload["logger"] == "tests.logging_smoke"
assert payload["message"] == "smoke message"
assert "timestamp" in payload
def test_access_log_filter_enriches_record():
from quartermaster.logging_config import AccessLogFilter
record = logging.LogRecord(
name="uvicorn.access",
level=logging.INFO,
pathname=__file__,
lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "GET", "/healthz", "1.1", 200),
exc_info=None,
)
assert AccessLogFilter().filter(record) is True
assert record.event == "http_request"
assert record.method == "GET"
assert record.path == "/healthz"
assert record.status == 200
assert record.client_ip == "127.0.0.1:1234"
def test_access_log_filter_passes_through_non_access_records():
"""Non-access records (args tuple shape mismatch) are kept, unmodified."""
from quartermaster.logging_config import AccessLogFilter
record = logging.LogRecord(
name="quartermaster",
level=logging.INFO,
pathname=__file__,
lineno=0,
msg="regular app log",
args=None,
exc_info=None,
)
assert AccessLogFilter().filter(record) is True
assert not hasattr(record, "event") or record.event is None
assert not hasattr(record, "method") or record.method is None
def test_log_config_loads_via_dictconfig():
"""Verify the whole LOG_CONFIG passes logging.config.dictConfig without error."""
import logging.config
from quartermaster.logging_config import LOG_CONFIG
qm = logging.getLogger("quartermaster")
saved_propagate = qm.propagate
saved_handlers = qm.handlers[:]
saved_level = qm.level
try:
logging.config.dictConfig(LOG_CONFIG)
finally:
qm.propagate = saved_propagate
qm.handlers = saved_handlers
qm.level = saved_level
def _make_debt_minimum(db, name="Loan", amount=Decimal("100.00")):
from quartermaster.models import Entry
entry = Entry(section=Section.debt_minimum, name=name, amount=amount)
db.add(entry)
db.commit()
db.refresh(entry)
return entry
def test_create_month_logs_month_created_event(db, caplog):
caplog.set_level(logging.INFO, logger="quartermaster")
month_service.create_month(db, "2026-05")
events = [r for r in caplog.records if getattr(r, "event", None) == "month_created"]
assert len(events) == 1
assert events[0].levelname == "INFO"
def test_close_month_logs_month_closed_event(db, caplog):
month = month_service.create_month(db, "2026-06")
# move through active → closed with a zero-applied budget (no entries yet)
month_service.activate_month(db, month)
caplog.clear()
caplog.set_level(logging.INFO, logger="quartermaster")
month_service.close_month(db, month)
events = [r for r in caplog.records if getattr(r, "event", None) == "month_closed"]
assert len(events) == 1
def test_add_posting_logs_posting_added_event(db, caplog):
_make_debt_minimum(db)
month = month_service.create_month(db, "2026-07")
entry = month.entries[0]
caplog.set_level(logging.INFO, logger="quartermaster")
month_service.add_posting(
db, month, entry.id,
occurred_on=date(2026, 7, 15),
amount=Decimal("25.00"),
)
events = [r for r in caplog.records if getattr(r, "event", None) == "posting_added"]
assert len(events) == 1
def test_delete_posting_logs_posting_deleted_event(db, caplog):
_make_debt_minimum(db)
month = month_service.create_month(db, "2026-08")
entry = month.entries[0]
posting = month_service.add_posting(
db, month, entry.id,
occurred_on=date(2026, 8, 15),
amount=Decimal("10.00"),
)
caplog.clear()
caplog.set_level(logging.INFO, logger="quartermaster")
month_service.delete_posting(db, month, posting.id)
events = [r for r in caplog.records if getattr(r, "event", None) == "posting_deleted"]
assert len(events) == 1
def test_update_entry_logs_template_entry_updated_event(db, caplog):
from quartermaster.models import Entry
entry = Entry(section=Section.fixed_bill, name="Power", amount=Decimal("50.00"))
db.add(entry)
db.commit()
db.refresh(entry)
caplog.set_level(logging.INFO, logger="quartermaster")
service.update_entry(db, entry.id, amount=Decimal("55.00"))
events = [
r for r in caplog.records
if getattr(r, "event", None) == "template_entry_updated"
]
assert len(events) == 1
assert events[0].entry_id == entry.id