Compare commits

...

2 commits

Author SHA1 Message Date
3b57b563ab Merge pull request 'feat(arxiv): ingest pipeline (M5.1.1)' (#58) from feat/arxiv-rag-ingest into main 2026-04-09 02:03:59 +00:00
Jeff Smith
14cfd53514 feat(arxiv): ingest pipeline (M5.1.1)
Closes #38. First sub-milestone of M5.1 (Researcher #2: arxiv-rag).

New package researchers/arxiv/ with three modules:

- store.py — ArxivStore wraps a persistent chromadb collection at
  ~/.marchwarden/arxiv-rag/chroma/ plus a papers.json manifest. Chunk
  ids are deterministic and embedding-model-scoped (per ArxivRagProposal
  decision 4) so re-ingesting with a different embedder doesn't collide
  with prior chunks.
- ingest.py — three-phase pipeline: download_pdf (arxiv API), extract_sections
  (pymupdf with heuristic heading detection + whole-paper fallback), and
  embed_and_store (sentence-transformers, configurable via
  MARCHWARDEN_ARXIV_EMBED_MODEL). Top-level ingest() chains them and
  upserts the manifest entry. Re-ingest is idempotent — chunks for the
  same paper are dropped before re-adding.
- CLI subgroup `marchwarden arxiv add|list|info|remove`. Lazy-imports
  the heavy chromadb / torch deps so non-arxiv commands stay fast.

The heavy ML deps (pymupdf, chromadb, sentence-transformers, arxiv) are
gated behind an optional `[arxiv]` extra so the base install stays slim
for users who only want the web researcher.

Tests: 14 added (141 total passing). Real pymupdf against synthetic PDFs
generated at test time covers extract_sections; chromadb and the
embedder are stubbed via dependency injection so the tests stay fast,
deterministic, and network-free. End-to-end ingest() is exercised with
a mocked arxiv.Search that produces synthetic PDFs.

Out of scope for #38 (covered by later sub-milestones):
- Retrieval / search API (#39)
- ArxivResearcher agent loop (#40)
- MCP server (#41)
- ask --researcher arxiv flag (#42)
- Cost ledger embedding_calls field (#43)

Notes:
- pip install pulled in CUDA torch wheel (~2GB nvidia libs); harmless on
  CPU-only WSL but a future optimization would pin the CPU torch index.
- Live smoke against a real arxiv id deferred so we don't block the M3.3
  collection runner currently using the venv.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 20:03:42 -06:00
6 changed files with 1143 additions and 0 deletions

View file

@ -534,5 +534,134 @@ def costs(
render_costs(filtered, console) render_costs(filtered, console)
# ---------------------------------------------------------------------------
# arxiv subgroup (M5.1.1)
# ---------------------------------------------------------------------------
@cli.group()
def arxiv() -> None:
"""Manage the local arxiv-rag corpus.
Sub-commands let you ingest arxiv papers, list what's indexed, and
inspect individual entries. Retrieval and search ship in #39+.
"""
@arxiv.command("add")
@click.argument("arxiv_ids", nargs=-1, required=True)
@click.option(
"--embedding-model",
default=None,
help=(
"Override embedding model. Defaults to "
"$MARCHWARDEN_ARXIV_EMBED_MODEL or nomic-ai/nomic-embed-text-v1.5."
),
)
def arxiv_add(arxiv_ids: tuple[str, ...], embedding_model: Optional[str]) -> None:
"""Download, extract, embed, and index one or more arxiv papers by ID."""
# Imported lazily so the CLI doesn't pay the chromadb / torch import
# cost on every invocation — only when the user actually runs an
# arxiv command.
from researchers.arxiv.ingest import DEFAULT_EMBEDDING_MODEL, ingest
from researchers.arxiv.store import ArxivStore
console = Console()
store = ArxivStore()
model = embedding_model or DEFAULT_EMBEDDING_MODEL
for arxiv_id in arxiv_ids:
console.print(f"[dim]Ingesting:[/dim] {arxiv_id} (model={model})")
try:
record = ingest(arxiv_id, store=store, model_name=model)
except Exception as exc:
console.print(f"[bold red]Failed:[/bold red] {arxiv_id}: {exc}")
continue
console.print(
f" -> [green]ok[/green] {record.title or '(no title)'} "
f"({record.chunks_indexed} chunks)"
)
@arxiv.command("list")
def arxiv_list() -> None:
"""Show all indexed arxiv papers."""
from researchers.arxiv.store import ArxivStore
console = Console()
store = ArxivStore()
papers = store.list_papers()
if not papers:
console.print(
"[dim]No papers indexed yet. Use[/dim] "
"[bold]marchwarden arxiv add <id>[/bold]"
)
return
table = Table(title=f"Indexed papers ({len(papers)})", show_lines=False, expand=True)
table.add_column("arxiv_id", style="cyan")
table.add_column("Title", overflow="fold")
table.add_column("Year", justify="right", width=6)
table.add_column("Chunks", justify="right", width=6)
table.add_column("Model", overflow="fold")
for p in papers:
table.add_row(
p.arxiv_id,
p.title or "(no title)",
str(p.year) if p.year else "",
str(p.chunks_indexed),
p.embedding_model,
)
console.print(table)
@arxiv.command("info")
@click.argument("arxiv_id")
def arxiv_info(arxiv_id: str) -> None:
"""Show metadata + chunk count for one indexed paper."""
from researchers.arxiv.store import ArxivStore
console = Console()
store = ArxivStore()
record = store.get_paper(arxiv_id)
if record is None:
console.print(
f"[bold red]Not indexed:[/bold red] {arxiv_id}. "
f"Use [bold]marchwarden arxiv add {arxiv_id}[/bold]."
)
sys.exit(1)
text = Text()
text.append(f"arxiv_id: {record.arxiv_id}\n", style="bold")
text.append(f"title: {record.title or '(none)'}\n")
text.append(f"authors: {', '.join(record.authors) or '(none)'}\n")
text.append(f"year: {record.year or '(unknown)'}\n")
text.append(f"category: {record.category or '(unknown)'}\n")
text.append(f"chunks: {record.chunks_indexed}\n")
text.append(f"embedding_model: {record.embedding_model}\n")
text.append(f"added_at: {record.added_at}\n")
console.print(Panel(text, title=arxiv_id, border_style="cyan"))
@arxiv.command("remove")
@click.argument("arxiv_id")
def arxiv_remove(arxiv_id: str) -> None:
"""Drop one paper from the manifest and chromadb collection."""
from researchers.arxiv.store import ArxivStore
console = Console()
store = ArxivStore()
chunks_removed = store.delete_paper(arxiv_id)
in_manifest = store.remove_paper(arxiv_id)
if not in_manifest and chunks_removed == 0:
console.print(f"[yellow]Not found:[/yellow] {arxiv_id}")
sys.exit(1)
console.print(
f"[green]Removed[/green] {arxiv_id} "
f"({chunks_removed} chunks dropped)"
)
if __name__ == "__main__": if __name__ == "__main__":
cli() cli()

View file

@ -32,6 +32,14 @@ dev = [
"ruff>=0.1.0", "ruff>=0.1.0",
"mypy>=1.0", "mypy>=1.0",
] ]
# arxiv-rag researcher (M5.1). Heavy ML deps — optional extra so the base
# install stays slim for users who only want the web researcher.
arxiv = [
"pymupdf>=1.24",
"chromadb>=0.5",
"sentence-transformers>=3.0",
"arxiv>=2.1",
]
[project.scripts] [project.scripts]
marchwarden = "cli.main:cli" marchwarden = "cli.main:cli"

View file

@ -0,0 +1,7 @@
"""arxiv-rag researcher.
Second researcher implementation in Marchwarden. M5.1.1 is the ingest
pipeline only see researchers.arxiv.ingest and researchers.arxiv.store.
The agent loop, MCP server, and ResearchContract integration ship in
later sub-milestones (#39#43).
"""

370
researchers/arxiv/ingest.py Normal file
View file

@ -0,0 +1,370 @@
"""Ingest pipeline for the arxiv-rag researcher.
Public surface:
download_pdf(arxiv_id, store) -> Path
extract_sections(pdf_path) -> list[Section]
embed_and_store(arxiv_id, sections, store, model_name, metadata) -> int
ingest(arxiv_id, store=None, model_name=...) -> PaperRecord # one-shot
The split exists so unit tests can mock each phase independently. The
top-level ``ingest()`` is what the CLI calls.
Section detection is heuristic: we walk the PDF page by page, look for
short lines that match a small set of canonical academic headings
(introduction, methods, results, discussion, conclusion, references,
etc.), and use those as section boundaries. If nothing matches, we fall
back to one Section containing the entire paper text citations to
that section will still be valid, just less precise.
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional
from .store import ArxivStore, PaperRecord, make_chunk_id
# ---------------------------------------------------------------------------
# Defaults
# ---------------------------------------------------------------------------
DEFAULT_EMBEDDING_MODEL = os.environ.get(
"MARCHWARDEN_ARXIV_EMBED_MODEL",
"nomic-ai/nomic-embed-text-v1.5",
)
# Headings considered "section starters" for the heuristic. Order
# matters only for documentation; matching is case-insensitive and
# whole-line.
_SECTION_HEADINGS = [
"abstract",
"introduction",
"background",
"related work",
"preliminaries",
"methods",
"method",
"methodology",
"approach",
"model",
"experiments",
"experimental setup",
"evaluation",
"results",
"discussion",
"analysis",
"limitations",
"conclusion",
"conclusions",
"future work",
"references",
"acknowledgments",
"appendix",
]
# Compiled match: optional leading number ("3", "3.1", "III"), optional
# trailing punctuation, the heading word, end of line.
_HEADING_RE = re.compile(
r"^\s*(?:[0-9IVX]+\.?[0-9.]*)?\s*(?P<title>" + "|".join(_SECTION_HEADINGS) + r")\s*$",
re.IGNORECASE,
)
@dataclass
class Section:
"""One section of a paper."""
index: int
title: str
text: str
page_start: int
page_end: int
@dataclass
class PaperMetadata:
"""Lightweight metadata extracted from arxiv at download time."""
arxiv_id: str
version: str
title: str
authors: list[str] = field(default_factory=list)
year: Optional[int] = None
category: Optional[str] = None
# ---------------------------------------------------------------------------
# Phase 1 — download
# ---------------------------------------------------------------------------
def download_pdf(
arxiv_id: str,
store: ArxivStore,
*,
arxiv_search: Optional[Callable] = None,
) -> tuple[Path, PaperMetadata]:
"""Download a paper PDF and return its cached path + arxiv metadata.
``arxiv_search`` is injectable for tests so we can avoid hitting the
real arxiv API. The default uses the ``arxiv`` package.
"""
target = store.pdfs_dir / f"{arxiv_id}.pdf"
if arxiv_search is None:
import arxiv as arxiv_pkg
search = arxiv_pkg.Search(id_list=[arxiv_id])
results = list(search.results())
else:
results = list(arxiv_search(arxiv_id))
if not results:
raise ValueError(f"arxiv id not found: {arxiv_id}")
paper = results[0]
# Download the PDF if we don't already have it cached.
if not target.exists():
# Both the real arxiv.Result and our test stub expose
# download_pdf(dirpath, filename). Test stubs may also accept a
# destination Path directly — try that first, fall back.
try:
paper.download_pdf(
dirpath=str(store.pdfs_dir),
filename=f"{arxiv_id}.pdf",
)
except TypeError:
paper.download_pdf(str(target))
metadata = PaperMetadata(
arxiv_id=arxiv_id,
version=getattr(paper, "entry_id", "").rsplit("v", 1)[-1] if "v" in getattr(paper, "entry_id", "") else "",
title=getattr(paper, "title", "") or "",
authors=[
getattr(a, "name", str(a))
for a in (getattr(paper, "authors", []) or [])
],
year=(
getattr(paper, "published", None).year
if getattr(paper, "published", None) is not None
else None
),
category=getattr(paper, "primary_category", None),
)
return target, metadata
# ---------------------------------------------------------------------------
# Phase 2 — extract sections
# ---------------------------------------------------------------------------
def extract_sections(pdf_path: Path) -> list[Section]:
"""Extract section-level chunks from a PDF.
Heuristic: walk pages, split on lines that match a known section
heading. If no headings are detected, return one Section containing
the whole document.
"""
import pymupdf
doc = pymupdf.open(str(pdf_path))
try:
# Build a flat list of (page_num, line) tuples for the whole doc.
lines: list[tuple[int, str]] = []
for page_num, page in enumerate(doc, start=1):
text = page.get_text("text") or ""
for raw_line in text.splitlines():
stripped = raw_line.strip()
if stripped:
lines.append((page_num, stripped))
finally:
doc.close()
# Find heading boundaries.
boundaries: list[tuple[int, str, int]] = [] # (line_index, title, page_num)
for i, (page_num, line) in enumerate(lines):
if len(line) > 80:
# Section headings are short. Skip likely body text.
continue
m = _HEADING_RE.match(line)
if m:
boundaries.append((i, m.group("title").strip().title(), page_num))
sections: list[Section] = []
if not boundaries:
# Fallback: whole paper as one section.
full_text = "\n".join(line for _, line in lines)
if not full_text.strip():
return []
first_page = lines[0][0] if lines else 1
last_page = lines[-1][0] if lines else 1
return [
Section(
index=0,
title="Full Paper",
text=full_text,
page_start=first_page,
page_end=last_page,
)
]
# Build sections between consecutive boundaries.
for idx, (start_line, title, page_start) in enumerate(boundaries):
end_line = (
boundaries[idx + 1][0] if idx + 1 < len(boundaries) else len(lines)
)
body_lines = lines[start_line + 1 : end_line]
text = "\n".join(line for _, line in body_lines).strip()
if not text:
continue
page_end = body_lines[-1][0] if body_lines else page_start
sections.append(
Section(
index=idx,
title=title,
text=text,
page_start=page_start,
page_end=page_end,
)
)
if not sections:
# Headings detected but every section was empty — fall back to
# whole paper rather than dropping the document.
full_text = "\n".join(line for _, line in lines)
return [
Section(
index=0,
title="Full Paper",
text=full_text,
page_start=lines[0][0],
page_end=lines[-1][0],
)
]
return sections
# ---------------------------------------------------------------------------
# Phase 3 — embed and store
# ---------------------------------------------------------------------------
def _load_embedder(model_name: str):
"""Load a sentence-transformers embedder. Cached at module level so
repeated ingests in the same process don't re-download / re-load.
"""
cache = _load_embedder._cache # type: ignore[attr-defined]
if model_name in cache:
return cache[model_name]
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer(model_name, trust_remote_code=True)
cache[model_name] = embedder
return embedder
_load_embedder._cache = {} # type: ignore[attr-defined]
def embed_and_store(
arxiv_id: str,
sections: list[Section],
store: ArxivStore,
model_name: str,
metadata: PaperMetadata,
*,
embedder: Optional[object] = None,
) -> int:
"""Embed each section and write to the chromadb collection.
``embedder`` is injectable for tests so we don't have to load
sentence-transformers. It must expose ``encode(list[str]) -> list[list[float]]``.
Returns the number of chunks written.
"""
if not sections:
return 0
if embedder is None:
embedder = _load_embedder(model_name)
texts = [s.text for s in sections]
raw_vectors = embedder.encode(texts)
# sentence-transformers returns a numpy.ndarray; chromadb wants
# plain lists. Handle both shapes.
embeddings: list[list[float]] = []
for vec in raw_vectors:
if hasattr(vec, "tolist"):
embeddings.append(vec.tolist())
else:
embeddings.append(list(vec))
ids = [make_chunk_id(arxiv_id, s.index, model_name) for s in sections]
metadatas = [
{
"arxiv_id": arxiv_id,
"section_index": s.index,
"section_title": s.title,
"page_start": s.page_start,
"page_end": s.page_end,
"title": metadata.title,
"embedding_model": model_name,
}
for s in sections
]
# Replace any prior chunks for this paper under this embedding model
# before re-adding. Idempotency: re-ingest with the same model is a
# no-op in observable state.
store.delete_paper(arxiv_id)
store.add_chunks(ids=ids, documents=texts, embeddings=embeddings, metadatas=metadatas)
return len(ids)
# ---------------------------------------------------------------------------
# Top-level orchestrator
# ---------------------------------------------------------------------------
def ingest(
arxiv_id: str,
store: Optional[ArxivStore] = None,
*,
model_name: str = DEFAULT_EMBEDDING_MODEL,
arxiv_search: Optional[Callable] = None,
embedder: Optional[object] = None,
) -> PaperRecord:
"""End-to-end ingest: download → extract → embed → store → manifest."""
store = store or ArxivStore()
pdf_path, metadata = download_pdf(arxiv_id, store, arxiv_search=arxiv_search)
sections = extract_sections(pdf_path)
chunk_count = embed_and_store(
arxiv_id=arxiv_id,
sections=sections,
store=store,
model_name=model_name,
metadata=metadata,
embedder=embedder,
)
record = PaperRecord(
arxiv_id=arxiv_id,
version=metadata.version,
title=metadata.title,
authors=metadata.authors,
year=metadata.year,
category=metadata.category,
chunks_indexed=chunk_count,
embedding_model=model_name,
)
store.upsert_paper(record)
return record

214
researchers/arxiv/store.py Normal file
View file

@ -0,0 +1,214 @@
"""Chromadb wrapper for the arxiv-rag researcher.
The store lives at ``~/.marchwarden/arxiv-rag/`` and contains:
- ``papers.json`` manifest mapping arxiv_id -> metadata
- ``pdfs/<id>.pdf`` cached PDFs
- ``chroma/`` chromadb persistent collection of embedded chunks
This module is intentionally narrow: it exposes the persistent state and
the operations the ingest + retrieval layers need (add chunks, fetch
manifest, list papers). The retrieval layer (#39) will add a query API
on top of the same collection.
Chunk IDs are deterministic and include the embedding model name in
their hash so re-ingesting a paper with a different embedder creates a
new ID space rather than silently overwriting old citations
(ArxivRagProposal §1, decision 4).
"""
from __future__ import annotations
import hashlib
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable, Optional
DEFAULT_ROOT = Path(os.path.expanduser("~/.marchwarden/arxiv-rag"))
DEFAULT_COLLECTION = "arxiv_chunks"
@dataclass
class PaperRecord:
"""Manifest entry for one indexed paper."""
arxiv_id: str
version: str
title: str
authors: list[str]
year: Optional[int]
category: Optional[str]
chunks_indexed: int
embedding_model: str
added_at: str = field(
default_factory=lambda: datetime.now(timezone.utc)
.isoformat(timespec="seconds")
.replace("+00:00", "Z")
)
def to_dict(self) -> dict:
return {
"version": self.version,
"title": self.title,
"authors": list(self.authors),
"year": self.year,
"category": self.category,
"chunks_indexed": self.chunks_indexed,
"embedding_model": self.embedding_model,
"added_at": self.added_at,
}
@classmethod
def from_dict(cls, arxiv_id: str, data: dict) -> "PaperRecord":
return cls(
arxiv_id=arxiv_id,
version=data.get("version", ""),
title=data.get("title", ""),
authors=list(data.get("authors", [])),
year=data.get("year"),
category=data.get("category"),
chunks_indexed=int(data.get("chunks_indexed", 0)),
embedding_model=data.get("embedding_model", ""),
added_at=data.get("added_at", ""),
)
def make_chunk_id(arxiv_id: str, section_index: int, embedding_model: str) -> str:
"""Deterministic chunk id, scoped by embedding model.
Format: ``<arxiv_id>::<section_index>::<sha1(model)[0:8]>``. The
model hash slice keeps the id readable while making it unique across
embedding models. See ArxivRagProposal §1 decision 4 re-ingesting
with a different model must not collide with prior chunks.
"""
model_hash = hashlib.sha1(embedding_model.encode("utf-8")).hexdigest()[:8]
return f"{arxiv_id}::{section_index:04d}::{model_hash}"
class ArxivStore:
"""File-backed manifest + chromadb collection for indexed papers."""
def __init__(
self,
root: Optional[Path] = None,
collection_name: str = DEFAULT_COLLECTION,
):
self.root = Path(root) if root else DEFAULT_ROOT
self.pdfs_dir = self.root / "pdfs"
self.chroma_dir = self.root / "chroma"
self.manifest_path = self.root / "papers.json"
self.collection_name = collection_name
self.root.mkdir(parents=True, exist_ok=True)
self.pdfs_dir.mkdir(parents=True, exist_ok=True)
self.chroma_dir.mkdir(parents=True, exist_ok=True)
self._client = None # lazy
self._collection = None # lazy
# ------------------------------------------------------------------
# Chroma — lazy because importing chromadb is slow
# ------------------------------------------------------------------
@property
def collection(self):
"""Lazy chromadb collection handle."""
if self._collection is None:
import chromadb
self._client = chromadb.PersistentClient(path=str(self.chroma_dir))
self._collection = self._client.get_or_create_collection(
name=self.collection_name,
# Cosine distance — typical for sentence-transformer
# embeddings normalized to unit length.
metadata={"hnsw:space": "cosine"},
)
return self._collection
def add_chunks(
self,
ids: list[str],
documents: list[str],
embeddings: list[list[float]],
metadatas: list[dict[str, Any]],
) -> None:
"""Add a batch of embedded chunks to the collection."""
if not ids:
return
if not (len(ids) == len(documents) == len(embeddings) == len(metadatas)):
raise ValueError(
"ids/documents/embeddings/metadatas must all have the same length"
)
self.collection.upsert(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
)
def chunk_count_for(self, arxiv_id: str) -> int:
"""Number of chunks currently stored for one paper."""
# chromadb's get() with a where filter returns the matching docs;
# we just need the count.
try:
res = self.collection.get(where={"arxiv_id": arxiv_id})
except Exception:
return 0
return len(res.get("ids", []))
def delete_paper(self, arxiv_id: str) -> int:
"""Remove all chunks for one paper. Returns number deleted."""
before = self.chunk_count_for(arxiv_id)
if before == 0:
return 0
self.collection.delete(where={"arxiv_id": arxiv_id})
return before
# ------------------------------------------------------------------
# Manifest — plain JSON, atomic write
# ------------------------------------------------------------------
def load_manifest(self) -> dict[str, PaperRecord]:
"""Read papers.json. Returns {} if missing."""
if not self.manifest_path.exists():
return {}
data = json.loads(self.manifest_path.read_text(encoding="utf-8"))
return {
arxiv_id: PaperRecord.from_dict(arxiv_id, entry)
for arxiv_id, entry in data.items()
}
def save_manifest(self, manifest: dict[str, PaperRecord]) -> None:
"""Write papers.json atomically."""
payload = {arxiv_id: rec.to_dict() for arxiv_id, rec in manifest.items()}
tmp = self.manifest_path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
tmp.replace(self.manifest_path)
def upsert_paper(self, record: PaperRecord) -> None:
"""Insert or replace one entry in the manifest."""
manifest = self.load_manifest()
manifest[record.arxiv_id] = record
self.save_manifest(manifest)
def remove_paper(self, arxiv_id: str) -> bool:
"""Drop one entry from the manifest. Returns True if removed."""
manifest = self.load_manifest()
if arxiv_id not in manifest:
return False
del manifest[arxiv_id]
self.save_manifest(manifest)
return True
def list_papers(self) -> list[PaperRecord]:
"""All manifest entries, sorted by added_at descending (newest first)."""
manifest = self.load_manifest()
return sorted(manifest.values(), key=lambda r: r.added_at, reverse=True)
def get_paper(self, arxiv_id: str) -> Optional[PaperRecord]:
"""Manifest entry for one paper, or None."""
return self.load_manifest().get(arxiv_id)

415
tests/test_arxiv_ingest.py Normal file
View file

@ -0,0 +1,415 @@
"""Tests for the arxiv-rag ingest pipeline (M5.1.1).
Strategy: mock the slow / network bits (arxiv API, embedder, chromadb)
and exercise the real pipeline against synthetic PDFs generated with
pymupdf at test time. This keeps the tests deterministic, fast, and
network-free while still exercising the actual extract_sections logic.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from researchers.arxiv import ingest as ingest_mod
from researchers.arxiv.ingest import (
PaperMetadata,
Section,
embed_and_store,
extract_sections,
ingest,
)
from researchers.arxiv.store import ArxivStore, PaperRecord, make_chunk_id
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_synthetic_pdf(path: Path, sections: list[tuple[str, str]]) -> None:
"""Build a tiny PDF with one section per (heading, body) tuple.
pymupdf is already a hard dep of the arxiv extra, so synthesizing a
fixture PDF inline is cheaper than checking a binary into the repo.
"""
import pymupdf
doc = pymupdf.open()
for heading, body in sections:
page = doc.new_page()
page.insert_text((50, 80), heading, fontsize=14)
# Wrap body across a few lines for realism
y = 110
for line in body.split("\n"):
page.insert_text((50, y), line, fontsize=11)
y += 16
doc.save(str(path))
doc.close()
@pytest.fixture
def store(tmp_path):
"""ArxivStore rooted in a temp directory."""
return ArxivStore(root=tmp_path / "arxiv-rag")
class StubEmbedder:
"""Minimal stand-in for sentence-transformers.SentenceTransformer."""
def __init__(self, dim: int = 4):
self.dim = dim
self.calls: list[list[str]] = []
def encode(self, texts):
self.calls.append(list(texts))
# Return deterministic vectors keyed off text length so two
# different sections produce two different embeddings.
return [[float(len(t)), 0.0, 0.0, 0.0] for t in texts]
class StubChromaCollection:
"""In-memory drop-in for a chromadb collection."""
def __init__(self):
self.docs: dict[str, dict] = {}
def upsert(self, ids, documents, embeddings, metadatas):
for i, doc, emb, meta in zip(ids, documents, embeddings, metadatas):
self.docs[i] = {"document": doc, "embedding": emb, "metadata": meta}
def get(self, where=None):
if where is None:
ids = list(self.docs.keys())
else:
ids = [
i
for i, entry in self.docs.items()
if all(entry["metadata"].get(k) == v for k, v in where.items())
]
return {"ids": ids}
def delete(self, where):
to_drop = [
i
for i, entry in self.docs.items()
if all(entry["metadata"].get(k) == v for k, v in where.items())
]
for i in to_drop:
del self.docs[i]
@pytest.fixture
def stub_collection(monkeypatch):
"""Replace ArxivStore.collection with an in-memory stub."""
stub = StubChromaCollection()
monkeypatch.setattr(
ArxivStore, "collection", property(lambda self: stub)
)
return stub
# ---------------------------------------------------------------------------
# extract_sections — real pymupdf, synthetic PDFs
# ---------------------------------------------------------------------------
class TestExtractSections:
def test_detects_canonical_headings(self, tmp_path):
pdf = tmp_path / "paper.pdf"
_make_synthetic_pdf(
pdf,
[
("Introduction", "We study X. We find Y."),
("Methods", "We used Z to evaluate Y."),
("Results", "Accuracy was 95%."),
("Conclusion", "X works."),
],
)
sections = extract_sections(pdf)
titles = [s.title.lower() for s in sections]
assert "introduction" in titles
assert "methods" in titles
assert "results" in titles
assert "conclusion" in titles
# Body text from each section should be present
intro = next(s for s in sections if s.title.lower() == "introduction")
assert "we study x" in intro.text.lower()
def test_falls_back_to_whole_paper_when_no_headings(self, tmp_path):
pdf = tmp_path / "no-headings.pdf"
_make_synthetic_pdf(
pdf,
[
("Some random title nobody recognizes", "Body text body text."),
],
)
sections = extract_sections(pdf)
assert len(sections) == 1
assert sections[0].title == "Full Paper"
assert "body text" in sections[0].text.lower()
# ---------------------------------------------------------------------------
# embed_and_store — uses stub collection + stub embedder
# ---------------------------------------------------------------------------
class TestEmbedAndStore:
def test_writes_chunks_and_returns_count(self, store, stub_collection):
sections = [
Section(index=0, title="Intro", text="aaa", page_start=1, page_end=1),
Section(index=1, title="Methods", text="bbbb", page_start=2, page_end=2),
]
meta = PaperMetadata(arxiv_id="2403.12345", version="v1", title="Test")
n = embed_and_store(
arxiv_id="2403.12345",
sections=sections,
store=store,
model_name="stub-model",
metadata=meta,
embedder=StubEmbedder(),
)
assert n == 2
assert len(stub_collection.docs) == 2
# Check that chunk ids are model-scoped
expected_ids = {make_chunk_id("2403.12345", i, "stub-model") for i in (0, 1)}
assert set(stub_collection.docs.keys()) == expected_ids
# Metadata round-trips
first = next(iter(stub_collection.docs.values()))
assert first["metadata"]["arxiv_id"] == "2403.12345"
assert first["metadata"]["embedding_model"] == "stub-model"
def test_re_embed_replaces_existing_chunks(self, store, stub_collection):
meta = PaperMetadata(arxiv_id="2403.12345", version="v1", title="Test")
sections_v1 = [
Section(index=0, title="Intro", text="first", page_start=1, page_end=1),
Section(index=1, title="Methods", text="second", page_start=2, page_end=2),
]
embed_and_store(
"2403.12345", sections_v1, store, "stub-model", meta,
embedder=StubEmbedder(),
)
assert len(stub_collection.docs) == 2
# Re-embed with fewer sections — should drop the second.
sections_v2 = [
Section(index=0, title="Intro", text="first", page_start=1, page_end=1),
]
embed_and_store(
"2403.12345", sections_v2, store, "stub-model", meta,
embedder=StubEmbedder(),
)
assert len(stub_collection.docs) == 1
def test_empty_sections_is_noop(self, store, stub_collection):
meta = PaperMetadata(arxiv_id="x", version="", title="")
n = embed_and_store("x", [], store, "stub-model", meta, embedder=StubEmbedder())
assert n == 0
assert stub_collection.docs == {}
# ---------------------------------------------------------------------------
# Top-level ingest() — full pipeline with mocked download
# ---------------------------------------------------------------------------
def _stub_arxiv_search(arxiv_id: str):
"""Return a fake arxiv.Search result for ``arxiv_id``."""
def _download_pdf(dirpath=None, filename=None):
# Generate a synthetic PDF on the fly so the rest of the
# pipeline has something real to read.
target = Path(dirpath) / filename
_make_synthetic_pdf(
target,
[
("Introduction", "Stub paper introduction."),
("Methods", "Stub paper methods."),
("Results", "Stub paper results."),
],
)
paper = SimpleNamespace(
entry_id=f"http://arxiv.org/abs/{arxiv_id}v1",
title=f"Test paper {arxiv_id}",
authors=[SimpleNamespace(name="Alice"), SimpleNamespace(name="Bob")],
published=datetime(2024, 1, 15, tzinfo=timezone.utc),
primary_category="cs.LG",
download_pdf=_download_pdf,
)
return [paper]
class TestIngest:
def test_end_to_end(self, store, stub_collection):
record = ingest(
"2403.12345",
store=store,
model_name="stub-model",
arxiv_search=_stub_arxiv_search,
embedder=StubEmbedder(),
)
# Manifest entry
assert isinstance(record, PaperRecord)
assert record.arxiv_id == "2403.12345"
assert record.title == "Test paper 2403.12345"
assert record.authors == ["Alice", "Bob"]
assert record.year == 2024
assert record.category == "cs.LG"
assert record.chunks_indexed >= 1
assert record.embedding_model == "stub-model"
# Manifest persisted to disk
loaded = store.load_manifest()
assert "2403.12345" in loaded
assert loaded["2403.12345"].chunks_indexed == record.chunks_indexed
# PDF cached
assert (store.pdfs_dir / "2403.12345.pdf").exists()
# Chunks in stub collection
assert len(stub_collection.docs) == record.chunks_indexed
def test_idempotent_reingest(self, store, stub_collection):
first = ingest(
"2403.12345",
store=store,
model_name="stub-model",
arxiv_search=_stub_arxiv_search,
embedder=StubEmbedder(),
)
chunks_after_first = len(stub_collection.docs)
second = ingest(
"2403.12345",
store=store,
model_name="stub-model",
arxiv_search=_stub_arxiv_search,
embedder=StubEmbedder(),
)
# Same number of chunks (replace, not append)
assert len(stub_collection.docs) == chunks_after_first
assert second.chunks_indexed == first.chunks_indexed
def test_unknown_arxiv_id_raises(self, store):
with pytest.raises(ValueError, match="not found"):
ingest(
"9999.99999",
store=store,
model_name="stub-model",
arxiv_search=lambda _id: [],
embedder=StubEmbedder(),
)
# ---------------------------------------------------------------------------
# Manifest CRUD via ArxivStore
# ---------------------------------------------------------------------------
class TestManifest:
def test_load_returns_empty_dict_when_missing(self, store):
assert store.load_manifest() == {}
def test_round_trip(self, store):
rec = PaperRecord(
arxiv_id="2401.00001",
version="v2",
title="Round trip test",
authors=["A", "B"],
year=2024,
category="cs.AI",
chunks_indexed=7,
embedding_model="m",
)
store.upsert_paper(rec)
loaded = store.load_manifest()
assert "2401.00001" in loaded
assert loaded["2401.00001"].title == "Round trip test"
assert loaded["2401.00001"].chunks_indexed == 7
def test_remove_paper(self, store):
rec = PaperRecord(
arxiv_id="2401.00001",
version="",
title="t",
authors=[],
year=None,
category=None,
chunks_indexed=0,
embedding_model="m",
)
store.upsert_paper(rec)
assert store.remove_paper("2401.00001") is True
assert store.load_manifest() == {}
assert store.remove_paper("2401.00001") is False
def test_list_sorted_newest_first(self, store):
old = PaperRecord(
arxiv_id="old",
version="",
title="old",
authors=[],
year=None,
category=None,
chunks_indexed=0,
embedding_model="m",
added_at="2020-01-01T00:00:00Z",
)
new = PaperRecord(
arxiv_id="new",
version="",
title="new",
authors=[],
year=None,
category=None,
chunks_indexed=0,
embedding_model="m",
added_at="2026-01-01T00:00:00Z",
)
store.upsert_paper(old)
store.upsert_paper(new)
listed = store.list_papers()
assert [p.arxiv_id for p in listed] == ["new", "old"]
# ---------------------------------------------------------------------------
# CLI smoke (without actually calling chromadb)
# ---------------------------------------------------------------------------
class TestArxivCLI:
def test_list_empty(self, tmp_path, monkeypatch):
from click.testing import CliRunner
from cli.main import cli
monkeypatch.setattr(
"researchers.arxiv.store.DEFAULT_ROOT",
tmp_path / "arxiv-rag",
)
runner = CliRunner()
result = runner.invoke(cli, ["arxiv", "list"])
assert result.exit_code == 0, result.output
assert "No papers indexed" in result.output
def test_info_missing(self, tmp_path, monkeypatch):
from click.testing import CliRunner
from cli.main import cli
monkeypatch.setattr(
"researchers.arxiv.store.DEFAULT_ROOT",
tmp_path / "arxiv-rag",
)
runner = CliRunner()
result = runner.invoke(cli, ["arxiv", "info", "0000.00000"])
assert result.exit_code == 1
assert "Not indexed" in result.output