From 14cfd53514884b441873a00764f1f7aed12e7a2a Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Wed, 8 Apr 2026 20:03:42 -0600 Subject: [PATCH] feat(arxiv): ingest pipeline (M5.1.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #38. First sub-milestone of M5.1 (Researcher #2: arxiv-rag). New package researchers/arxiv/ with three modules: - store.py — ArxivStore wraps a persistent chromadb collection at ~/.marchwarden/arxiv-rag/chroma/ plus a papers.json manifest. Chunk ids are deterministic and embedding-model-scoped (per ArxivRagProposal decision 4) so re-ingesting with a different embedder doesn't collide with prior chunks. - ingest.py — three-phase pipeline: download_pdf (arxiv API), extract_sections (pymupdf with heuristic heading detection + whole-paper fallback), and embed_and_store (sentence-transformers, configurable via MARCHWARDEN_ARXIV_EMBED_MODEL). Top-level ingest() chains them and upserts the manifest entry. Re-ingest is idempotent — chunks for the same paper are dropped before re-adding. - CLI subgroup `marchwarden arxiv add|list|info|remove`. Lazy-imports the heavy chromadb / torch deps so non-arxiv commands stay fast. The heavy ML deps (pymupdf, chromadb, sentence-transformers, arxiv) are gated behind an optional `[arxiv]` extra so the base install stays slim for users who only want the web researcher. Tests: 14 added (141 total passing). Real pymupdf against synthetic PDFs generated at test time covers extract_sections; chromadb and the embedder are stubbed via dependency injection so the tests stay fast, deterministic, and network-free. End-to-end ingest() is exercised with a mocked arxiv.Search that produces synthetic PDFs. Out of scope for #38 (covered by later sub-milestones): - Retrieval / search API (#39) - ArxivResearcher agent loop (#40) - MCP server (#41) - ask --researcher arxiv flag (#42) - Cost ledger embedding_calls field (#43) Notes: - pip install pulled in CUDA torch wheel (~2GB nvidia libs); harmless on CPU-only WSL but a future optimization would pin the CPU torch index. - Live smoke against a real arxiv id deferred so we don't block the M3.3 collection runner currently using the venv. Co-Authored-By: Claude Opus 4.6 (1M context) --- cli/main.py | 129 +++++++++++ pyproject.toml | 8 + researchers/arxiv/__init__.py | 7 + researchers/arxiv/ingest.py | 370 ++++++++++++++++++++++++++++++ researchers/arxiv/store.py | 214 ++++++++++++++++++ tests/test_arxiv_ingest.py | 415 ++++++++++++++++++++++++++++++++++ 6 files changed, 1143 insertions(+) create mode 100644 researchers/arxiv/__init__.py create mode 100644 researchers/arxiv/ingest.py create mode 100644 researchers/arxiv/store.py create mode 100644 tests/test_arxiv_ingest.py diff --git a/cli/main.py b/cli/main.py index eef9ceb..5a7479a 100644 --- a/cli/main.py +++ b/cli/main.py @@ -534,5 +534,134 @@ def costs( render_costs(filtered, console) +# --------------------------------------------------------------------------- +# arxiv subgroup (M5.1.1) +# --------------------------------------------------------------------------- + + +@cli.group() +def arxiv() -> None: + """Manage the local arxiv-rag corpus. + + Sub-commands let you ingest arxiv papers, list what's indexed, and + inspect individual entries. Retrieval and search ship in #39+. + """ + + +@arxiv.command("add") +@click.argument("arxiv_ids", nargs=-1, required=True) +@click.option( + "--embedding-model", + default=None, + help=( + "Override embedding model. Defaults to " + "$MARCHWARDEN_ARXIV_EMBED_MODEL or nomic-ai/nomic-embed-text-v1.5." + ), +) +def arxiv_add(arxiv_ids: tuple[str, ...], embedding_model: Optional[str]) -> None: + """Download, extract, embed, and index one or more arxiv papers by ID.""" + # Imported lazily so the CLI doesn't pay the chromadb / torch import + # cost on every invocation — only when the user actually runs an + # arxiv command. + from researchers.arxiv.ingest import DEFAULT_EMBEDDING_MODEL, ingest + from researchers.arxiv.store import ArxivStore + + console = Console() + store = ArxivStore() + model = embedding_model or DEFAULT_EMBEDDING_MODEL + + for arxiv_id in arxiv_ids: + console.print(f"[dim]Ingesting:[/dim] {arxiv_id} (model={model})") + try: + record = ingest(arxiv_id, store=store, model_name=model) + except Exception as exc: + console.print(f"[bold red]Failed:[/bold red] {arxiv_id}: {exc}") + continue + console.print( + f" -> [green]ok[/green] {record.title or '(no title)'} " + f"({record.chunks_indexed} chunks)" + ) + + +@arxiv.command("list") +def arxiv_list() -> None: + """Show all indexed arxiv papers.""" + from researchers.arxiv.store import ArxivStore + + console = Console() + store = ArxivStore() + papers = store.list_papers() + + if not papers: + console.print( + "[dim]No papers indexed yet. Use[/dim] " + "[bold]marchwarden arxiv add [/bold]" + ) + return + + table = Table(title=f"Indexed papers ({len(papers)})", show_lines=False, expand=True) + table.add_column("arxiv_id", style="cyan") + table.add_column("Title", overflow="fold") + table.add_column("Year", justify="right", width=6) + table.add_column("Chunks", justify="right", width=6) + table.add_column("Model", overflow="fold") + for p in papers: + table.add_row( + p.arxiv_id, + p.title or "(no title)", + str(p.year) if p.year else "—", + str(p.chunks_indexed), + p.embedding_model, + ) + console.print(table) + + +@arxiv.command("info") +@click.argument("arxiv_id") +def arxiv_info(arxiv_id: str) -> None: + """Show metadata + chunk count for one indexed paper.""" + from researchers.arxiv.store import ArxivStore + + console = Console() + store = ArxivStore() + record = store.get_paper(arxiv_id) + if record is None: + console.print( + f"[bold red]Not indexed:[/bold red] {arxiv_id}. " + f"Use [bold]marchwarden arxiv add {arxiv_id}[/bold]." + ) + sys.exit(1) + + text = Text() + text.append(f"arxiv_id: {record.arxiv_id}\n", style="bold") + text.append(f"title: {record.title or '(none)'}\n") + text.append(f"authors: {', '.join(record.authors) or '(none)'}\n") + text.append(f"year: {record.year or '(unknown)'}\n") + text.append(f"category: {record.category or '(unknown)'}\n") + text.append(f"chunks: {record.chunks_indexed}\n") + text.append(f"embedding_model: {record.embedding_model}\n") + text.append(f"added_at: {record.added_at}\n") + console.print(Panel(text, title=arxiv_id, border_style="cyan")) + + +@arxiv.command("remove") +@click.argument("arxiv_id") +def arxiv_remove(arxiv_id: str) -> None: + """Drop one paper from the manifest and chromadb collection.""" + from researchers.arxiv.store import ArxivStore + + console = Console() + store = ArxivStore() + chunks_removed = store.delete_paper(arxiv_id) + in_manifest = store.remove_paper(arxiv_id) + if not in_manifest and chunks_removed == 0: + console.print(f"[yellow]Not found:[/yellow] {arxiv_id}") + sys.exit(1) + console.print( + f"[green]Removed[/green] {arxiv_id} " + f"({chunks_removed} chunks dropped)" + ) + + if __name__ == "__main__": cli() diff --git a/pyproject.toml b/pyproject.toml index c2140c0..fda277f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,14 @@ dev = [ "ruff>=0.1.0", "mypy>=1.0", ] +# arxiv-rag researcher (M5.1). Heavy ML deps — optional extra so the base +# install stays slim for users who only want the web researcher. +arxiv = [ + "pymupdf>=1.24", + "chromadb>=0.5", + "sentence-transformers>=3.0", + "arxiv>=2.1", +] [project.scripts] marchwarden = "cli.main:cli" diff --git a/researchers/arxiv/__init__.py b/researchers/arxiv/__init__.py new file mode 100644 index 0000000..8d05dc0 --- /dev/null +++ b/researchers/arxiv/__init__.py @@ -0,0 +1,7 @@ +"""arxiv-rag researcher. + +Second researcher implementation in Marchwarden. M5.1.1 is the ingest +pipeline only — see researchers.arxiv.ingest and researchers.arxiv.store. +The agent loop, MCP server, and ResearchContract integration ship in +later sub-milestones (#39–#43). +""" diff --git a/researchers/arxiv/ingest.py b/researchers/arxiv/ingest.py new file mode 100644 index 0000000..2574539 --- /dev/null +++ b/researchers/arxiv/ingest.py @@ -0,0 +1,370 @@ +"""Ingest pipeline for the arxiv-rag researcher. + +Public surface: + + download_pdf(arxiv_id, store) -> Path + extract_sections(pdf_path) -> list[Section] + embed_and_store(arxiv_id, sections, store, model_name, metadata) -> int + ingest(arxiv_id, store=None, model_name=...) -> PaperRecord # one-shot + +The split exists so unit tests can mock each phase independently. The +top-level ``ingest()`` is what the CLI calls. + +Section detection is heuristic: we walk the PDF page by page, look for +short lines that match a small set of canonical academic headings +(introduction, methods, results, discussion, conclusion, references, +etc.), and use those as section boundaries. If nothing matches, we fall +back to one Section containing the entire paper text — citations to +that section will still be valid, just less precise. +""" + +from __future__ import annotations + +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Optional + +from .store import ArxivStore, PaperRecord, make_chunk_id + + +# --------------------------------------------------------------------------- +# Defaults +# --------------------------------------------------------------------------- + +DEFAULT_EMBEDDING_MODEL = os.environ.get( + "MARCHWARDEN_ARXIV_EMBED_MODEL", + "nomic-ai/nomic-embed-text-v1.5", +) + +# Headings considered "section starters" for the heuristic. Order +# matters only for documentation; matching is case-insensitive and +# whole-line. +_SECTION_HEADINGS = [ + "abstract", + "introduction", + "background", + "related work", + "preliminaries", + "methods", + "method", + "methodology", + "approach", + "model", + "experiments", + "experimental setup", + "evaluation", + "results", + "discussion", + "analysis", + "limitations", + "conclusion", + "conclusions", + "future work", + "references", + "acknowledgments", + "appendix", +] + +# Compiled match: optional leading number ("3", "3.1", "III"), optional +# trailing punctuation, the heading word, end of line. +_HEADING_RE = re.compile( + r"^\s*(?:[0-9IVX]+\.?[0-9.]*)?\s*(?P" + "|".join(_SECTION_HEADINGS) + r")\s*$", + re.IGNORECASE, +) + + +@dataclass +class Section: + """One section of a paper.""" + + index: int + title: str + text: str + page_start: int + page_end: int + + +@dataclass +class PaperMetadata: + """Lightweight metadata extracted from arxiv at download time.""" + + arxiv_id: str + version: str + title: str + authors: list[str] = field(default_factory=list) + year: Optional[int] = None + category: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Phase 1 — download +# --------------------------------------------------------------------------- + + +def download_pdf( + arxiv_id: str, + store: ArxivStore, + *, + arxiv_search: Optional[Callable] = None, +) -> tuple[Path, PaperMetadata]: + """Download a paper PDF and return its cached path + arxiv metadata. + + ``arxiv_search`` is injectable for tests so we can avoid hitting the + real arxiv API. The default uses the ``arxiv`` package. + """ + target = store.pdfs_dir / f"{arxiv_id}.pdf" + + if arxiv_search is None: + import arxiv as arxiv_pkg + + search = arxiv_pkg.Search(id_list=[arxiv_id]) + results = list(search.results()) + else: + results = list(arxiv_search(arxiv_id)) + + if not results: + raise ValueError(f"arxiv id not found: {arxiv_id}") + + paper = results[0] + + # Download the PDF if we don't already have it cached. + if not target.exists(): + # Both the real arxiv.Result and our test stub expose + # download_pdf(dirpath, filename). Test stubs may also accept a + # destination Path directly — try that first, fall back. + try: + paper.download_pdf( + dirpath=str(store.pdfs_dir), + filename=f"{arxiv_id}.pdf", + ) + except TypeError: + paper.download_pdf(str(target)) + + metadata = PaperMetadata( + arxiv_id=arxiv_id, + version=getattr(paper, "entry_id", "").rsplit("v", 1)[-1] if "v" in getattr(paper, "entry_id", "") else "", + title=getattr(paper, "title", "") or "", + authors=[ + getattr(a, "name", str(a)) + for a in (getattr(paper, "authors", []) or []) + ], + year=( + getattr(paper, "published", None).year + if getattr(paper, "published", None) is not None + else None + ), + category=getattr(paper, "primary_category", None), + ) + return target, metadata + + +# --------------------------------------------------------------------------- +# Phase 2 — extract sections +# --------------------------------------------------------------------------- + + +def extract_sections(pdf_path: Path) -> list[Section]: + """Extract section-level chunks from a PDF. + + Heuristic: walk pages, split on lines that match a known section + heading. If no headings are detected, return one Section containing + the whole document. + """ + import pymupdf + + doc = pymupdf.open(str(pdf_path)) + try: + # Build a flat list of (page_num, line) tuples for the whole doc. + lines: list[tuple[int, str]] = [] + for page_num, page in enumerate(doc, start=1): + text = page.get_text("text") or "" + for raw_line in text.splitlines(): + stripped = raw_line.strip() + if stripped: + lines.append((page_num, stripped)) + finally: + doc.close() + + # Find heading boundaries. + boundaries: list[tuple[int, str, int]] = [] # (line_index, title, page_num) + for i, (page_num, line) in enumerate(lines): + if len(line) > 80: + # Section headings are short. Skip likely body text. + continue + m = _HEADING_RE.match(line) + if m: + boundaries.append((i, m.group("title").strip().title(), page_num)) + + sections: list[Section] = [] + + if not boundaries: + # Fallback: whole paper as one section. + full_text = "\n".join(line for _, line in lines) + if not full_text.strip(): + return [] + first_page = lines[0][0] if lines else 1 + last_page = lines[-1][0] if lines else 1 + return [ + Section( + index=0, + title="Full Paper", + text=full_text, + page_start=first_page, + page_end=last_page, + ) + ] + + # Build sections between consecutive boundaries. + for idx, (start_line, title, page_start) in enumerate(boundaries): + end_line = ( + boundaries[idx + 1][0] if idx + 1 < len(boundaries) else len(lines) + ) + body_lines = lines[start_line + 1 : end_line] + text = "\n".join(line for _, line in body_lines).strip() + if not text: + continue + page_end = body_lines[-1][0] if body_lines else page_start + sections.append( + Section( + index=idx, + title=title, + text=text, + page_start=page_start, + page_end=page_end, + ) + ) + + if not sections: + # Headings detected but every section was empty — fall back to + # whole paper rather than dropping the document. + full_text = "\n".join(line for _, line in lines) + return [ + Section( + index=0, + title="Full Paper", + text=full_text, + page_start=lines[0][0], + page_end=lines[-1][0], + ) + ] + + return sections + + +# --------------------------------------------------------------------------- +# Phase 3 — embed and store +# --------------------------------------------------------------------------- + + +def _load_embedder(model_name: str): + """Load a sentence-transformers embedder. Cached at module level so + repeated ingests in the same process don't re-download / re-load. + """ + cache = _load_embedder._cache # type: ignore[attr-defined] + if model_name in cache: + return cache[model_name] + from sentence_transformers import SentenceTransformer + + embedder = SentenceTransformer(model_name, trust_remote_code=True) + cache[model_name] = embedder + return embedder + + +_load_embedder._cache = {} # type: ignore[attr-defined] + + +def embed_and_store( + arxiv_id: str, + sections: list[Section], + store: ArxivStore, + model_name: str, + metadata: PaperMetadata, + *, + embedder: Optional[object] = None, +) -> int: + """Embed each section and write to the chromadb collection. + + ``embedder`` is injectable for tests so we don't have to load + sentence-transformers. It must expose ``encode(list[str]) -> list[list[float]]``. + Returns the number of chunks written. + """ + if not sections: + return 0 + + if embedder is None: + embedder = _load_embedder(model_name) + + texts = [s.text for s in sections] + raw_vectors = embedder.encode(texts) + # sentence-transformers returns a numpy.ndarray; chromadb wants + # plain lists. Handle both shapes. + embeddings: list[list[float]] = [] + for vec in raw_vectors: + if hasattr(vec, "tolist"): + embeddings.append(vec.tolist()) + else: + embeddings.append(list(vec)) + + ids = [make_chunk_id(arxiv_id, s.index, model_name) for s in sections] + metadatas = [ + { + "arxiv_id": arxiv_id, + "section_index": s.index, + "section_title": s.title, + "page_start": s.page_start, + "page_end": s.page_end, + "title": metadata.title, + "embedding_model": model_name, + } + for s in sections + ] + + # Replace any prior chunks for this paper under this embedding model + # before re-adding. Idempotency: re-ingest with the same model is a + # no-op in observable state. + store.delete_paper(arxiv_id) + store.add_chunks(ids=ids, documents=texts, embeddings=embeddings, metadatas=metadatas) + return len(ids) + + +# --------------------------------------------------------------------------- +# Top-level orchestrator +# --------------------------------------------------------------------------- + + +def ingest( + arxiv_id: str, + store: Optional[ArxivStore] = None, + *, + model_name: str = DEFAULT_EMBEDDING_MODEL, + arxiv_search: Optional[Callable] = None, + embedder: Optional[object] = None, +) -> PaperRecord: + """End-to-end ingest: download → extract → embed → store → manifest.""" + store = store or ArxivStore() + + pdf_path, metadata = download_pdf(arxiv_id, store, arxiv_search=arxiv_search) + sections = extract_sections(pdf_path) + chunk_count = embed_and_store( + arxiv_id=arxiv_id, + sections=sections, + store=store, + model_name=model_name, + metadata=metadata, + embedder=embedder, + ) + + record = PaperRecord( + arxiv_id=arxiv_id, + version=metadata.version, + title=metadata.title, + authors=metadata.authors, + year=metadata.year, + category=metadata.category, + chunks_indexed=chunk_count, + embedding_model=model_name, + ) + store.upsert_paper(record) + return record diff --git a/researchers/arxiv/store.py b/researchers/arxiv/store.py new file mode 100644 index 0000000..531af60 --- /dev/null +++ b/researchers/arxiv/store.py @@ -0,0 +1,214 @@ +"""Chromadb wrapper for the arxiv-rag researcher. + +The store lives at ``~/.marchwarden/arxiv-rag/`` and contains: + +- ``papers.json`` — manifest mapping arxiv_id -> metadata +- ``pdfs/<id>.pdf`` — cached PDFs +- ``chroma/`` — chromadb persistent collection of embedded chunks + +This module is intentionally narrow: it exposes the persistent state and +the operations the ingest + retrieval layers need (add chunks, fetch +manifest, list papers). The retrieval layer (#39) will add a query API +on top of the same collection. + +Chunk IDs are deterministic and include the embedding model name in +their hash so re-ingesting a paper with a different embedder creates a +new ID space rather than silently overwriting old citations +(ArxivRagProposal §1, decision 4). +""" + +from __future__ import annotations + +import hashlib +import json +import os +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable, Optional + + +DEFAULT_ROOT = Path(os.path.expanduser("~/.marchwarden/arxiv-rag")) +DEFAULT_COLLECTION = "arxiv_chunks" + + +@dataclass +class PaperRecord: + """Manifest entry for one indexed paper.""" + + arxiv_id: str + version: str + title: str + authors: list[str] + year: Optional[int] + category: Optional[str] + chunks_indexed: int + embedding_model: str + added_at: str = field( + default_factory=lambda: datetime.now(timezone.utc) + .isoformat(timespec="seconds") + .replace("+00:00", "Z") + ) + + def to_dict(self) -> dict: + return { + "version": self.version, + "title": self.title, + "authors": list(self.authors), + "year": self.year, + "category": self.category, + "chunks_indexed": self.chunks_indexed, + "embedding_model": self.embedding_model, + "added_at": self.added_at, + } + + @classmethod + def from_dict(cls, arxiv_id: str, data: dict) -> "PaperRecord": + return cls( + arxiv_id=arxiv_id, + version=data.get("version", ""), + title=data.get("title", ""), + authors=list(data.get("authors", [])), + year=data.get("year"), + category=data.get("category"), + chunks_indexed=int(data.get("chunks_indexed", 0)), + embedding_model=data.get("embedding_model", ""), + added_at=data.get("added_at", ""), + ) + + +def make_chunk_id(arxiv_id: str, section_index: int, embedding_model: str) -> str: + """Deterministic chunk id, scoped by embedding model. + + Format: ``<arxiv_id>::<section_index>::<sha1(model)[0:8]>``. The + model hash slice keeps the id readable while making it unique across + embedding models. See ArxivRagProposal §1 decision 4 — re-ingesting + with a different model must not collide with prior chunks. + """ + model_hash = hashlib.sha1(embedding_model.encode("utf-8")).hexdigest()[:8] + return f"{arxiv_id}::{section_index:04d}::{model_hash}" + + +class ArxivStore: + """File-backed manifest + chromadb collection for indexed papers.""" + + def __init__( + self, + root: Optional[Path] = None, + collection_name: str = DEFAULT_COLLECTION, + ): + self.root = Path(root) if root else DEFAULT_ROOT + self.pdfs_dir = self.root / "pdfs" + self.chroma_dir = self.root / "chroma" + self.manifest_path = self.root / "papers.json" + self.collection_name = collection_name + + self.root.mkdir(parents=True, exist_ok=True) + self.pdfs_dir.mkdir(parents=True, exist_ok=True) + self.chroma_dir.mkdir(parents=True, exist_ok=True) + + self._client = None # lazy + self._collection = None # lazy + + # ------------------------------------------------------------------ + # Chroma — lazy because importing chromadb is slow + # ------------------------------------------------------------------ + + @property + def collection(self): + """Lazy chromadb collection handle.""" + if self._collection is None: + import chromadb + + self._client = chromadb.PersistentClient(path=str(self.chroma_dir)) + self._collection = self._client.get_or_create_collection( + name=self.collection_name, + # Cosine distance — typical for sentence-transformer + # embeddings normalized to unit length. + metadata={"hnsw:space": "cosine"}, + ) + return self._collection + + def add_chunks( + self, + ids: list[str], + documents: list[str], + embeddings: list[list[float]], + metadatas: list[dict[str, Any]], + ) -> None: + """Add a batch of embedded chunks to the collection.""" + if not ids: + return + if not (len(ids) == len(documents) == len(embeddings) == len(metadatas)): + raise ValueError( + "ids/documents/embeddings/metadatas must all have the same length" + ) + self.collection.upsert( + ids=ids, + documents=documents, + embeddings=embeddings, + metadatas=metadatas, + ) + + def chunk_count_for(self, arxiv_id: str) -> int: + """Number of chunks currently stored for one paper.""" + # chromadb's get() with a where filter returns the matching docs; + # we just need the count. + try: + res = self.collection.get(where={"arxiv_id": arxiv_id}) + except Exception: + return 0 + return len(res.get("ids", [])) + + def delete_paper(self, arxiv_id: str) -> int: + """Remove all chunks for one paper. Returns number deleted.""" + before = self.chunk_count_for(arxiv_id) + if before == 0: + return 0 + self.collection.delete(where={"arxiv_id": arxiv_id}) + return before + + # ------------------------------------------------------------------ + # Manifest — plain JSON, atomic write + # ------------------------------------------------------------------ + + def load_manifest(self) -> dict[str, PaperRecord]: + """Read papers.json. Returns {} if missing.""" + if not self.manifest_path.exists(): + return {} + data = json.loads(self.manifest_path.read_text(encoding="utf-8")) + return { + arxiv_id: PaperRecord.from_dict(arxiv_id, entry) + for arxiv_id, entry in data.items() + } + + def save_manifest(self, manifest: dict[str, PaperRecord]) -> None: + """Write papers.json atomically.""" + payload = {arxiv_id: rec.to_dict() for arxiv_id, rec in manifest.items()} + tmp = self.manifest_path.with_suffix(".json.tmp") + tmp.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") + tmp.replace(self.manifest_path) + + def upsert_paper(self, record: PaperRecord) -> None: + """Insert or replace one entry in the manifest.""" + manifest = self.load_manifest() + manifest[record.arxiv_id] = record + self.save_manifest(manifest) + + def remove_paper(self, arxiv_id: str) -> bool: + """Drop one entry from the manifest. Returns True if removed.""" + manifest = self.load_manifest() + if arxiv_id not in manifest: + return False + del manifest[arxiv_id] + self.save_manifest(manifest) + return True + + def list_papers(self) -> list[PaperRecord]: + """All manifest entries, sorted by added_at descending (newest first).""" + manifest = self.load_manifest() + return sorted(manifest.values(), key=lambda r: r.added_at, reverse=True) + + def get_paper(self, arxiv_id: str) -> Optional[PaperRecord]: + """Manifest entry for one paper, or None.""" + return self.load_manifest().get(arxiv_id) diff --git a/tests/test_arxiv_ingest.py b/tests/test_arxiv_ingest.py new file mode 100644 index 0000000..392ce27 --- /dev/null +++ b/tests/test_arxiv_ingest.py @@ -0,0 +1,415 @@ +"""Tests for the arxiv-rag ingest pipeline (M5.1.1). + +Strategy: mock the slow / network bits (arxiv API, embedder, chromadb) +and exercise the real pipeline against synthetic PDFs generated with +pymupdf at test time. This keeps the tests deterministic, fast, and +network-free while still exercising the actual extract_sections logic. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from researchers.arxiv import ingest as ingest_mod +from researchers.arxiv.ingest import ( + PaperMetadata, + Section, + embed_and_store, + extract_sections, + ingest, +) +from researchers.arxiv.store import ArxivStore, PaperRecord, make_chunk_id + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_synthetic_pdf(path: Path, sections: list[tuple[str, str]]) -> None: + """Build a tiny PDF with one section per (heading, body) tuple. + + pymupdf is already a hard dep of the arxiv extra, so synthesizing a + fixture PDF inline is cheaper than checking a binary into the repo. + """ + import pymupdf + + doc = pymupdf.open() + for heading, body in sections: + page = doc.new_page() + page.insert_text((50, 80), heading, fontsize=14) + # Wrap body across a few lines for realism + y = 110 + for line in body.split("\n"): + page.insert_text((50, y), line, fontsize=11) + y += 16 + doc.save(str(path)) + doc.close() + + +@pytest.fixture +def store(tmp_path): + """ArxivStore rooted in a temp directory.""" + return ArxivStore(root=tmp_path / "arxiv-rag") + + +class StubEmbedder: + """Minimal stand-in for sentence-transformers.SentenceTransformer.""" + + def __init__(self, dim: int = 4): + self.dim = dim + self.calls: list[list[str]] = [] + + def encode(self, texts): + self.calls.append(list(texts)) + # Return deterministic vectors keyed off text length so two + # different sections produce two different embeddings. + return [[float(len(t)), 0.0, 0.0, 0.0] for t in texts] + + +class StubChromaCollection: + """In-memory drop-in for a chromadb collection.""" + + def __init__(self): + self.docs: dict[str, dict] = {} + + def upsert(self, ids, documents, embeddings, metadatas): + for i, doc, emb, meta in zip(ids, documents, embeddings, metadatas): + self.docs[i] = {"document": doc, "embedding": emb, "metadata": meta} + + def get(self, where=None): + if where is None: + ids = list(self.docs.keys()) + else: + ids = [ + i + for i, entry in self.docs.items() + if all(entry["metadata"].get(k) == v for k, v in where.items()) + ] + return {"ids": ids} + + def delete(self, where): + to_drop = [ + i + for i, entry in self.docs.items() + if all(entry["metadata"].get(k) == v for k, v in where.items()) + ] + for i in to_drop: + del self.docs[i] + + +@pytest.fixture +def stub_collection(monkeypatch): + """Replace ArxivStore.collection with an in-memory stub.""" + stub = StubChromaCollection() + monkeypatch.setattr( + ArxivStore, "collection", property(lambda self: stub) + ) + return stub + + +# --------------------------------------------------------------------------- +# extract_sections — real pymupdf, synthetic PDFs +# --------------------------------------------------------------------------- + + +class TestExtractSections: + def test_detects_canonical_headings(self, tmp_path): + pdf = tmp_path / "paper.pdf" + _make_synthetic_pdf( + pdf, + [ + ("Introduction", "We study X. We find Y."), + ("Methods", "We used Z to evaluate Y."), + ("Results", "Accuracy was 95%."), + ("Conclusion", "X works."), + ], + ) + sections = extract_sections(pdf) + titles = [s.title.lower() for s in sections] + assert "introduction" in titles + assert "methods" in titles + assert "results" in titles + assert "conclusion" in titles + # Body text from each section should be present + intro = next(s for s in sections if s.title.lower() == "introduction") + assert "we study x" in intro.text.lower() + + def test_falls_back_to_whole_paper_when_no_headings(self, tmp_path): + pdf = tmp_path / "no-headings.pdf" + _make_synthetic_pdf( + pdf, + [ + ("Some random title nobody recognizes", "Body text body text."), + ], + ) + sections = extract_sections(pdf) + assert len(sections) == 1 + assert sections[0].title == "Full Paper" + assert "body text" in sections[0].text.lower() + + +# --------------------------------------------------------------------------- +# embed_and_store — uses stub collection + stub embedder +# --------------------------------------------------------------------------- + + +class TestEmbedAndStore: + def test_writes_chunks_and_returns_count(self, store, stub_collection): + sections = [ + Section(index=0, title="Intro", text="aaa", page_start=1, page_end=1), + Section(index=1, title="Methods", text="bbbb", page_start=2, page_end=2), + ] + meta = PaperMetadata(arxiv_id="2403.12345", version="v1", title="Test") + n = embed_and_store( + arxiv_id="2403.12345", + sections=sections, + store=store, + model_name="stub-model", + metadata=meta, + embedder=StubEmbedder(), + ) + assert n == 2 + assert len(stub_collection.docs) == 2 + # Check that chunk ids are model-scoped + expected_ids = {make_chunk_id("2403.12345", i, "stub-model") for i in (0, 1)} + assert set(stub_collection.docs.keys()) == expected_ids + # Metadata round-trips + first = next(iter(stub_collection.docs.values())) + assert first["metadata"]["arxiv_id"] == "2403.12345" + assert first["metadata"]["embedding_model"] == "stub-model" + + def test_re_embed_replaces_existing_chunks(self, store, stub_collection): + meta = PaperMetadata(arxiv_id="2403.12345", version="v1", title="Test") + sections_v1 = [ + Section(index=0, title="Intro", text="first", page_start=1, page_end=1), + Section(index=1, title="Methods", text="second", page_start=2, page_end=2), + ] + embed_and_store( + "2403.12345", sections_v1, store, "stub-model", meta, + embedder=StubEmbedder(), + ) + assert len(stub_collection.docs) == 2 + + # Re-embed with fewer sections — should drop the second. + sections_v2 = [ + Section(index=0, title="Intro", text="first", page_start=1, page_end=1), + ] + embed_and_store( + "2403.12345", sections_v2, store, "stub-model", meta, + embedder=StubEmbedder(), + ) + assert len(stub_collection.docs) == 1 + + def test_empty_sections_is_noop(self, store, stub_collection): + meta = PaperMetadata(arxiv_id="x", version="", title="") + n = embed_and_store("x", [], store, "stub-model", meta, embedder=StubEmbedder()) + assert n == 0 + assert stub_collection.docs == {} + + +# --------------------------------------------------------------------------- +# Top-level ingest() — full pipeline with mocked download +# --------------------------------------------------------------------------- + + +def _stub_arxiv_search(arxiv_id: str): + """Return a fake arxiv.Search result for ``arxiv_id``.""" + + def _download_pdf(dirpath=None, filename=None): + # Generate a synthetic PDF on the fly so the rest of the + # pipeline has something real to read. + target = Path(dirpath) / filename + _make_synthetic_pdf( + target, + [ + ("Introduction", "Stub paper introduction."), + ("Methods", "Stub paper methods."), + ("Results", "Stub paper results."), + ], + ) + + paper = SimpleNamespace( + entry_id=f"http://arxiv.org/abs/{arxiv_id}v1", + title=f"Test paper {arxiv_id}", + authors=[SimpleNamespace(name="Alice"), SimpleNamespace(name="Bob")], + published=datetime(2024, 1, 15, tzinfo=timezone.utc), + primary_category="cs.LG", + download_pdf=_download_pdf, + ) + return [paper] + + +class TestIngest: + def test_end_to_end(self, store, stub_collection): + record = ingest( + "2403.12345", + store=store, + model_name="stub-model", + arxiv_search=_stub_arxiv_search, + embedder=StubEmbedder(), + ) + + # Manifest entry + assert isinstance(record, PaperRecord) + assert record.arxiv_id == "2403.12345" + assert record.title == "Test paper 2403.12345" + assert record.authors == ["Alice", "Bob"] + assert record.year == 2024 + assert record.category == "cs.LG" + assert record.chunks_indexed >= 1 + assert record.embedding_model == "stub-model" + + # Manifest persisted to disk + loaded = store.load_manifest() + assert "2403.12345" in loaded + assert loaded["2403.12345"].chunks_indexed == record.chunks_indexed + + # PDF cached + assert (store.pdfs_dir / "2403.12345.pdf").exists() + + # Chunks in stub collection + assert len(stub_collection.docs) == record.chunks_indexed + + def test_idempotent_reingest(self, store, stub_collection): + first = ingest( + "2403.12345", + store=store, + model_name="stub-model", + arxiv_search=_stub_arxiv_search, + embedder=StubEmbedder(), + ) + chunks_after_first = len(stub_collection.docs) + + second = ingest( + "2403.12345", + store=store, + model_name="stub-model", + arxiv_search=_stub_arxiv_search, + embedder=StubEmbedder(), + ) + # Same number of chunks (replace, not append) + assert len(stub_collection.docs) == chunks_after_first + assert second.chunks_indexed == first.chunks_indexed + + def test_unknown_arxiv_id_raises(self, store): + with pytest.raises(ValueError, match="not found"): + ingest( + "9999.99999", + store=store, + model_name="stub-model", + arxiv_search=lambda _id: [], + embedder=StubEmbedder(), + ) + + +# --------------------------------------------------------------------------- +# Manifest CRUD via ArxivStore +# --------------------------------------------------------------------------- + + +class TestManifest: + def test_load_returns_empty_dict_when_missing(self, store): + assert store.load_manifest() == {} + + def test_round_trip(self, store): + rec = PaperRecord( + arxiv_id="2401.00001", + version="v2", + title="Round trip test", + authors=["A", "B"], + year=2024, + category="cs.AI", + chunks_indexed=7, + embedding_model="m", + ) + store.upsert_paper(rec) + loaded = store.load_manifest() + assert "2401.00001" in loaded + assert loaded["2401.00001"].title == "Round trip test" + assert loaded["2401.00001"].chunks_indexed == 7 + + def test_remove_paper(self, store): + rec = PaperRecord( + arxiv_id="2401.00001", + version="", + title="t", + authors=[], + year=None, + category=None, + chunks_indexed=0, + embedding_model="m", + ) + store.upsert_paper(rec) + assert store.remove_paper("2401.00001") is True + assert store.load_manifest() == {} + assert store.remove_paper("2401.00001") is False + + def test_list_sorted_newest_first(self, store): + old = PaperRecord( + arxiv_id="old", + version="", + title="old", + authors=[], + year=None, + category=None, + chunks_indexed=0, + embedding_model="m", + added_at="2020-01-01T00:00:00Z", + ) + new = PaperRecord( + arxiv_id="new", + version="", + title="new", + authors=[], + year=None, + category=None, + chunks_indexed=0, + embedding_model="m", + added_at="2026-01-01T00:00:00Z", + ) + store.upsert_paper(old) + store.upsert_paper(new) + listed = store.list_papers() + assert [p.arxiv_id for p in listed] == ["new", "old"] + + +# --------------------------------------------------------------------------- +# CLI smoke (without actually calling chromadb) +# --------------------------------------------------------------------------- + + +class TestArxivCLI: + def test_list_empty(self, tmp_path, monkeypatch): + from click.testing import CliRunner + + from cli.main import cli + + monkeypatch.setattr( + "researchers.arxiv.store.DEFAULT_ROOT", + tmp_path / "arxiv-rag", + ) + runner = CliRunner() + result = runner.invoke(cli, ["arxiv", "list"]) + assert result.exit_code == 0, result.output + assert "No papers indexed" in result.output + + def test_info_missing(self, tmp_path, monkeypatch): + from click.testing import CliRunner + + from cli.main import cli + + monkeypatch.setattr( + "researchers.arxiv.store.DEFAULT_ROOT", + tmp_path / "arxiv-rag", + ) + runner = CliRunner() + result = runner.invoke(cli, ["arxiv", "info", "0000.00000"]) + assert result.exit_code == 1 + assert "Not indexed" in result.output