feat(arxiv): ingest pipeline (M5.1.1) #58
6 changed files with 1143 additions and 0 deletions
129
cli/main.py
129
cli/main.py
|
|
@ -534,5 +534,134 @@ def costs(
|
|||
render_costs(filtered, console)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# arxiv subgroup (M5.1.1)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@cli.group()
|
||||
def arxiv() -> None:
|
||||
"""Manage the local arxiv-rag corpus.
|
||||
|
||||
Sub-commands let you ingest arxiv papers, list what's indexed, and
|
||||
inspect individual entries. Retrieval and search ship in #39+.
|
||||
"""
|
||||
|
||||
|
||||
@arxiv.command("add")
|
||||
@click.argument("arxiv_ids", nargs=-1, required=True)
|
||||
@click.option(
|
||||
"--embedding-model",
|
||||
default=None,
|
||||
help=(
|
||||
"Override embedding model. Defaults to "
|
||||
"$MARCHWARDEN_ARXIV_EMBED_MODEL or nomic-ai/nomic-embed-text-v1.5."
|
||||
),
|
||||
)
|
||||
def arxiv_add(arxiv_ids: tuple[str, ...], embedding_model: Optional[str]) -> None:
|
||||
"""Download, extract, embed, and index one or more arxiv papers by ID."""
|
||||
# Imported lazily so the CLI doesn't pay the chromadb / torch import
|
||||
# cost on every invocation — only when the user actually runs an
|
||||
# arxiv command.
|
||||
from researchers.arxiv.ingest import DEFAULT_EMBEDDING_MODEL, ingest
|
||||
from researchers.arxiv.store import ArxivStore
|
||||
|
||||
console = Console()
|
||||
store = ArxivStore()
|
||||
model = embedding_model or DEFAULT_EMBEDDING_MODEL
|
||||
|
||||
for arxiv_id in arxiv_ids:
|
||||
console.print(f"[dim]Ingesting:[/dim] {arxiv_id} (model={model})")
|
||||
try:
|
||||
record = ingest(arxiv_id, store=store, model_name=model)
|
||||
except Exception as exc:
|
||||
console.print(f"[bold red]Failed:[/bold red] {arxiv_id}: {exc}")
|
||||
continue
|
||||
console.print(
|
||||
f" -> [green]ok[/green] {record.title or '(no title)'} "
|
||||
f"({record.chunks_indexed} chunks)"
|
||||
)
|
||||
|
||||
|
||||
@arxiv.command("list")
|
||||
def arxiv_list() -> None:
|
||||
"""Show all indexed arxiv papers."""
|
||||
from researchers.arxiv.store import ArxivStore
|
||||
|
||||
console = Console()
|
||||
store = ArxivStore()
|
||||
papers = store.list_papers()
|
||||
|
||||
if not papers:
|
||||
console.print(
|
||||
"[dim]No papers indexed yet. Use[/dim] "
|
||||
"[bold]marchwarden arxiv add <id>[/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
table = Table(title=f"Indexed papers ({len(papers)})", show_lines=False, expand=True)
|
||||
table.add_column("arxiv_id", style="cyan")
|
||||
table.add_column("Title", overflow="fold")
|
||||
table.add_column("Year", justify="right", width=6)
|
||||
table.add_column("Chunks", justify="right", width=6)
|
||||
table.add_column("Model", overflow="fold")
|
||||
for p in papers:
|
||||
table.add_row(
|
||||
p.arxiv_id,
|
||||
p.title or "(no title)",
|
||||
str(p.year) if p.year else "—",
|
||||
str(p.chunks_indexed),
|
||||
p.embedding_model,
|
||||
)
|
||||
console.print(table)
|
||||
|
||||
|
||||
@arxiv.command("info")
|
||||
@click.argument("arxiv_id")
|
||||
def arxiv_info(arxiv_id: str) -> None:
|
||||
"""Show metadata + chunk count for one indexed paper."""
|
||||
from researchers.arxiv.store import ArxivStore
|
||||
|
||||
console = Console()
|
||||
store = ArxivStore()
|
||||
record = store.get_paper(arxiv_id)
|
||||
if record is None:
|
||||
console.print(
|
||||
f"[bold red]Not indexed:[/bold red] {arxiv_id}. "
|
||||
f"Use [bold]marchwarden arxiv add {arxiv_id}[/bold]."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
text = Text()
|
||||
text.append(f"arxiv_id: {record.arxiv_id}\n", style="bold")
|
||||
text.append(f"title: {record.title or '(none)'}\n")
|
||||
text.append(f"authors: {', '.join(record.authors) or '(none)'}\n")
|
||||
text.append(f"year: {record.year or '(unknown)'}\n")
|
||||
text.append(f"category: {record.category or '(unknown)'}\n")
|
||||
text.append(f"chunks: {record.chunks_indexed}\n")
|
||||
text.append(f"embedding_model: {record.embedding_model}\n")
|
||||
text.append(f"added_at: {record.added_at}\n")
|
||||
console.print(Panel(text, title=arxiv_id, border_style="cyan"))
|
||||
|
||||
|
||||
@arxiv.command("remove")
|
||||
@click.argument("arxiv_id")
|
||||
def arxiv_remove(arxiv_id: str) -> None:
|
||||
"""Drop one paper from the manifest and chromadb collection."""
|
||||
from researchers.arxiv.store import ArxivStore
|
||||
|
||||
console = Console()
|
||||
store = ArxivStore()
|
||||
chunks_removed = store.delete_paper(arxiv_id)
|
||||
in_manifest = store.remove_paper(arxiv_id)
|
||||
if not in_manifest and chunks_removed == 0:
|
||||
console.print(f"[yellow]Not found:[/yellow] {arxiv_id}")
|
||||
sys.exit(1)
|
||||
console.print(
|
||||
f"[green]Removed[/green] {arxiv_id} "
|
||||
f"({chunks_removed} chunks dropped)"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
|
|
|
|||
|
|
@ -32,6 +32,14 @@ dev = [
|
|||
"ruff>=0.1.0",
|
||||
"mypy>=1.0",
|
||||
]
|
||||
# arxiv-rag researcher (M5.1). Heavy ML deps — optional extra so the base
|
||||
# install stays slim for users who only want the web researcher.
|
||||
arxiv = [
|
||||
"pymupdf>=1.24",
|
||||
"chromadb>=0.5",
|
||||
"sentence-transformers>=3.0",
|
||||
"arxiv>=2.1",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
marchwarden = "cli.main:cli"
|
||||
|
|
|
|||
7
researchers/arxiv/__init__.py
Normal file
7
researchers/arxiv/__init__.py
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
"""arxiv-rag researcher.
|
||||
|
||||
Second researcher implementation in Marchwarden. M5.1.1 is the ingest
|
||||
pipeline only — see researchers.arxiv.ingest and researchers.arxiv.store.
|
||||
The agent loop, MCP server, and ResearchContract integration ship in
|
||||
later sub-milestones (#39–#43).
|
||||
"""
|
||||
370
researchers/arxiv/ingest.py
Normal file
370
researchers/arxiv/ingest.py
Normal file
|
|
@ -0,0 +1,370 @@
|
|||
"""Ingest pipeline for the arxiv-rag researcher.
|
||||
|
||||
Public surface:
|
||||
|
||||
download_pdf(arxiv_id, store) -> Path
|
||||
extract_sections(pdf_path) -> list[Section]
|
||||
embed_and_store(arxiv_id, sections, store, model_name, metadata) -> int
|
||||
ingest(arxiv_id, store=None, model_name=...) -> PaperRecord # one-shot
|
||||
|
||||
The split exists so unit tests can mock each phase independently. The
|
||||
top-level ``ingest()`` is what the CLI calls.
|
||||
|
||||
Section detection is heuristic: we walk the PDF page by page, look for
|
||||
short lines that match a small set of canonical academic headings
|
||||
(introduction, methods, results, discussion, conclusion, references,
|
||||
etc.), and use those as section boundaries. If nothing matches, we fall
|
||||
back to one Section containing the entire paper text — citations to
|
||||
that section will still be valid, just less precise.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Callable, Optional
|
||||
|
||||
from .store import ArxivStore, PaperRecord, make_chunk_id
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Defaults
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_EMBEDDING_MODEL = os.environ.get(
|
||||
"MARCHWARDEN_ARXIV_EMBED_MODEL",
|
||||
"nomic-ai/nomic-embed-text-v1.5",
|
||||
)
|
||||
|
||||
# Headings considered "section starters" for the heuristic. Order
|
||||
# matters only for documentation; matching is case-insensitive and
|
||||
# whole-line.
|
||||
_SECTION_HEADINGS = [
|
||||
"abstract",
|
||||
"introduction",
|
||||
"background",
|
||||
"related work",
|
||||
"preliminaries",
|
||||
"methods",
|
||||
"method",
|
||||
"methodology",
|
||||
"approach",
|
||||
"model",
|
||||
"experiments",
|
||||
"experimental setup",
|
||||
"evaluation",
|
||||
"results",
|
||||
"discussion",
|
||||
"analysis",
|
||||
"limitations",
|
||||
"conclusion",
|
||||
"conclusions",
|
||||
"future work",
|
||||
"references",
|
||||
"acknowledgments",
|
||||
"appendix",
|
||||
]
|
||||
|
||||
# Compiled match: optional leading number ("3", "3.1", "III"), optional
|
||||
# trailing punctuation, the heading word, end of line.
|
||||
_HEADING_RE = re.compile(
|
||||
r"^\s*(?:[0-9IVX]+\.?[0-9.]*)?\s*(?P<title>" + "|".join(_SECTION_HEADINGS) + r")\s*$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Section:
|
||||
"""One section of a paper."""
|
||||
|
||||
index: int
|
||||
title: str
|
||||
text: str
|
||||
page_start: int
|
||||
page_end: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class PaperMetadata:
|
||||
"""Lightweight metadata extracted from arxiv at download time."""
|
||||
|
||||
arxiv_id: str
|
||||
version: str
|
||||
title: str
|
||||
authors: list[str] = field(default_factory=list)
|
||||
year: Optional[int] = None
|
||||
category: Optional[str] = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 1 — download
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def download_pdf(
|
||||
arxiv_id: str,
|
||||
store: ArxivStore,
|
||||
*,
|
||||
arxiv_search: Optional[Callable] = None,
|
||||
) -> tuple[Path, PaperMetadata]:
|
||||
"""Download a paper PDF and return its cached path + arxiv metadata.
|
||||
|
||||
``arxiv_search`` is injectable for tests so we can avoid hitting the
|
||||
real arxiv API. The default uses the ``arxiv`` package.
|
||||
"""
|
||||
target = store.pdfs_dir / f"{arxiv_id}.pdf"
|
||||
|
||||
if arxiv_search is None:
|
||||
import arxiv as arxiv_pkg
|
||||
|
||||
search = arxiv_pkg.Search(id_list=[arxiv_id])
|
||||
results = list(search.results())
|
||||
else:
|
||||
results = list(arxiv_search(arxiv_id))
|
||||
|
||||
if not results:
|
||||
raise ValueError(f"arxiv id not found: {arxiv_id}")
|
||||
|
||||
paper = results[0]
|
||||
|
||||
# Download the PDF if we don't already have it cached.
|
||||
if not target.exists():
|
||||
# Both the real arxiv.Result and our test stub expose
|
||||
# download_pdf(dirpath, filename). Test stubs may also accept a
|
||||
# destination Path directly — try that first, fall back.
|
||||
try:
|
||||
paper.download_pdf(
|
||||
dirpath=str(store.pdfs_dir),
|
||||
filename=f"{arxiv_id}.pdf",
|
||||
)
|
||||
except TypeError:
|
||||
paper.download_pdf(str(target))
|
||||
|
||||
metadata = PaperMetadata(
|
||||
arxiv_id=arxiv_id,
|
||||
version=getattr(paper, "entry_id", "").rsplit("v", 1)[-1] if "v" in getattr(paper, "entry_id", "") else "",
|
||||
title=getattr(paper, "title", "") or "",
|
||||
authors=[
|
||||
getattr(a, "name", str(a))
|
||||
for a in (getattr(paper, "authors", []) or [])
|
||||
],
|
||||
year=(
|
||||
getattr(paper, "published", None).year
|
||||
if getattr(paper, "published", None) is not None
|
||||
else None
|
||||
),
|
||||
category=getattr(paper, "primary_category", None),
|
||||
)
|
||||
return target, metadata
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2 — extract sections
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def extract_sections(pdf_path: Path) -> list[Section]:
|
||||
"""Extract section-level chunks from a PDF.
|
||||
|
||||
Heuristic: walk pages, split on lines that match a known section
|
||||
heading. If no headings are detected, return one Section containing
|
||||
the whole document.
|
||||
"""
|
||||
import pymupdf
|
||||
|
||||
doc = pymupdf.open(str(pdf_path))
|
||||
try:
|
||||
# Build a flat list of (page_num, line) tuples for the whole doc.
|
||||
lines: list[tuple[int, str]] = []
|
||||
for page_num, page in enumerate(doc, start=1):
|
||||
text = page.get_text("text") or ""
|
||||
for raw_line in text.splitlines():
|
||||
stripped = raw_line.strip()
|
||||
if stripped:
|
||||
lines.append((page_num, stripped))
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
# Find heading boundaries.
|
||||
boundaries: list[tuple[int, str, int]] = [] # (line_index, title, page_num)
|
||||
for i, (page_num, line) in enumerate(lines):
|
||||
if len(line) > 80:
|
||||
# Section headings are short. Skip likely body text.
|
||||
continue
|
||||
m = _HEADING_RE.match(line)
|
||||
if m:
|
||||
boundaries.append((i, m.group("title").strip().title(), page_num))
|
||||
|
||||
sections: list[Section] = []
|
||||
|
||||
if not boundaries:
|
||||
# Fallback: whole paper as one section.
|
||||
full_text = "\n".join(line for _, line in lines)
|
||||
if not full_text.strip():
|
||||
return []
|
||||
first_page = lines[0][0] if lines else 1
|
||||
last_page = lines[-1][0] if lines else 1
|
||||
return [
|
||||
Section(
|
||||
index=0,
|
||||
title="Full Paper",
|
||||
text=full_text,
|
||||
page_start=first_page,
|
||||
page_end=last_page,
|
||||
)
|
||||
]
|
||||
|
||||
# Build sections between consecutive boundaries.
|
||||
for idx, (start_line, title, page_start) in enumerate(boundaries):
|
||||
end_line = (
|
||||
boundaries[idx + 1][0] if idx + 1 < len(boundaries) else len(lines)
|
||||
)
|
||||
body_lines = lines[start_line + 1 : end_line]
|
||||
text = "\n".join(line for _, line in body_lines).strip()
|
||||
if not text:
|
||||
continue
|
||||
page_end = body_lines[-1][0] if body_lines else page_start
|
||||
sections.append(
|
||||
Section(
|
||||
index=idx,
|
||||
title=title,
|
||||
text=text,
|
||||
page_start=page_start,
|
||||
page_end=page_end,
|
||||
)
|
||||
)
|
||||
|
||||
if not sections:
|
||||
# Headings detected but every section was empty — fall back to
|
||||
# whole paper rather than dropping the document.
|
||||
full_text = "\n".join(line for _, line in lines)
|
||||
return [
|
||||
Section(
|
||||
index=0,
|
||||
title="Full Paper",
|
||||
text=full_text,
|
||||
page_start=lines[0][0],
|
||||
page_end=lines[-1][0],
|
||||
)
|
||||
]
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 3 — embed and store
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _load_embedder(model_name: str):
|
||||
"""Load a sentence-transformers embedder. Cached at module level so
|
||||
repeated ingests in the same process don't re-download / re-load.
|
||||
"""
|
||||
cache = _load_embedder._cache # type: ignore[attr-defined]
|
||||
if model_name in cache:
|
||||
return cache[model_name]
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
embedder = SentenceTransformer(model_name, trust_remote_code=True)
|
||||
cache[model_name] = embedder
|
||||
return embedder
|
||||
|
||||
|
||||
_load_embedder._cache = {} # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def embed_and_store(
|
||||
arxiv_id: str,
|
||||
sections: list[Section],
|
||||
store: ArxivStore,
|
||||
model_name: str,
|
||||
metadata: PaperMetadata,
|
||||
*,
|
||||
embedder: Optional[object] = None,
|
||||
) -> int:
|
||||
"""Embed each section and write to the chromadb collection.
|
||||
|
||||
``embedder`` is injectable for tests so we don't have to load
|
||||
sentence-transformers. It must expose ``encode(list[str]) -> list[list[float]]``.
|
||||
Returns the number of chunks written.
|
||||
"""
|
||||
if not sections:
|
||||
return 0
|
||||
|
||||
if embedder is None:
|
||||
embedder = _load_embedder(model_name)
|
||||
|
||||
texts = [s.text for s in sections]
|
||||
raw_vectors = embedder.encode(texts)
|
||||
# sentence-transformers returns a numpy.ndarray; chromadb wants
|
||||
# plain lists. Handle both shapes.
|
||||
embeddings: list[list[float]] = []
|
||||
for vec in raw_vectors:
|
||||
if hasattr(vec, "tolist"):
|
||||
embeddings.append(vec.tolist())
|
||||
else:
|
||||
embeddings.append(list(vec))
|
||||
|
||||
ids = [make_chunk_id(arxiv_id, s.index, model_name) for s in sections]
|
||||
metadatas = [
|
||||
{
|
||||
"arxiv_id": arxiv_id,
|
||||
"section_index": s.index,
|
||||
"section_title": s.title,
|
||||
"page_start": s.page_start,
|
||||
"page_end": s.page_end,
|
||||
"title": metadata.title,
|
||||
"embedding_model": model_name,
|
||||
}
|
||||
for s in sections
|
||||
]
|
||||
|
||||
# Replace any prior chunks for this paper under this embedding model
|
||||
# before re-adding. Idempotency: re-ingest with the same model is a
|
||||
# no-op in observable state.
|
||||
store.delete_paper(arxiv_id)
|
||||
store.add_chunks(ids=ids, documents=texts, embeddings=embeddings, metadatas=metadatas)
|
||||
return len(ids)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Top-level orchestrator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def ingest(
|
||||
arxiv_id: str,
|
||||
store: Optional[ArxivStore] = None,
|
||||
*,
|
||||
model_name: str = DEFAULT_EMBEDDING_MODEL,
|
||||
arxiv_search: Optional[Callable] = None,
|
||||
embedder: Optional[object] = None,
|
||||
) -> PaperRecord:
|
||||
"""End-to-end ingest: download → extract → embed → store → manifest."""
|
||||
store = store or ArxivStore()
|
||||
|
||||
pdf_path, metadata = download_pdf(arxiv_id, store, arxiv_search=arxiv_search)
|
||||
sections = extract_sections(pdf_path)
|
||||
chunk_count = embed_and_store(
|
||||
arxiv_id=arxiv_id,
|
||||
sections=sections,
|
||||
store=store,
|
||||
model_name=model_name,
|
||||
metadata=metadata,
|
||||
embedder=embedder,
|
||||
)
|
||||
|
||||
record = PaperRecord(
|
||||
arxiv_id=arxiv_id,
|
||||
version=metadata.version,
|
||||
title=metadata.title,
|
||||
authors=metadata.authors,
|
||||
year=metadata.year,
|
||||
category=metadata.category,
|
||||
chunks_indexed=chunk_count,
|
||||
embedding_model=model_name,
|
||||
)
|
||||
store.upsert_paper(record)
|
||||
return record
|
||||
214
researchers/arxiv/store.py
Normal file
214
researchers/arxiv/store.py
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
"""Chromadb wrapper for the arxiv-rag researcher.
|
||||
|
||||
The store lives at ``~/.marchwarden/arxiv-rag/`` and contains:
|
||||
|
||||
- ``papers.json`` — manifest mapping arxiv_id -> metadata
|
||||
- ``pdfs/<id>.pdf`` — cached PDFs
|
||||
- ``chroma/`` — chromadb persistent collection of embedded chunks
|
||||
|
||||
This module is intentionally narrow: it exposes the persistent state and
|
||||
the operations the ingest + retrieval layers need (add chunks, fetch
|
||||
manifest, list papers). The retrieval layer (#39) will add a query API
|
||||
on top of the same collection.
|
||||
|
||||
Chunk IDs are deterministic and include the embedding model name in
|
||||
their hash so re-ingesting a paper with a different embedder creates a
|
||||
new ID space rather than silently overwriting old citations
|
||||
(ArxivRagProposal §1, decision 4).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable, Optional
|
||||
|
||||
|
||||
DEFAULT_ROOT = Path(os.path.expanduser("~/.marchwarden/arxiv-rag"))
|
||||
DEFAULT_COLLECTION = "arxiv_chunks"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PaperRecord:
|
||||
"""Manifest entry for one indexed paper."""
|
||||
|
||||
arxiv_id: str
|
||||
version: str
|
||||
title: str
|
||||
authors: list[str]
|
||||
year: Optional[int]
|
||||
category: Optional[str]
|
||||
chunks_indexed: int
|
||||
embedding_model: str
|
||||
added_at: str = field(
|
||||
default_factory=lambda: datetime.now(timezone.utc)
|
||||
.isoformat(timespec="seconds")
|
||||
.replace("+00:00", "Z")
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"version": self.version,
|
||||
"title": self.title,
|
||||
"authors": list(self.authors),
|
||||
"year": self.year,
|
||||
"category": self.category,
|
||||
"chunks_indexed": self.chunks_indexed,
|
||||
"embedding_model": self.embedding_model,
|
||||
"added_at": self.added_at,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, arxiv_id: str, data: dict) -> "PaperRecord":
|
||||
return cls(
|
||||
arxiv_id=arxiv_id,
|
||||
version=data.get("version", ""),
|
||||
title=data.get("title", ""),
|
||||
authors=list(data.get("authors", [])),
|
||||
year=data.get("year"),
|
||||
category=data.get("category"),
|
||||
chunks_indexed=int(data.get("chunks_indexed", 0)),
|
||||
embedding_model=data.get("embedding_model", ""),
|
||||
added_at=data.get("added_at", ""),
|
||||
)
|
||||
|
||||
|
||||
def make_chunk_id(arxiv_id: str, section_index: int, embedding_model: str) -> str:
|
||||
"""Deterministic chunk id, scoped by embedding model.
|
||||
|
||||
Format: ``<arxiv_id>::<section_index>::<sha1(model)[0:8]>``. The
|
||||
model hash slice keeps the id readable while making it unique across
|
||||
embedding models. See ArxivRagProposal §1 decision 4 — re-ingesting
|
||||
with a different model must not collide with prior chunks.
|
||||
"""
|
||||
model_hash = hashlib.sha1(embedding_model.encode("utf-8")).hexdigest()[:8]
|
||||
return f"{arxiv_id}::{section_index:04d}::{model_hash}"
|
||||
|
||||
|
||||
class ArxivStore:
|
||||
"""File-backed manifest + chromadb collection for indexed papers."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
root: Optional[Path] = None,
|
||||
collection_name: str = DEFAULT_COLLECTION,
|
||||
):
|
||||
self.root = Path(root) if root else DEFAULT_ROOT
|
||||
self.pdfs_dir = self.root / "pdfs"
|
||||
self.chroma_dir = self.root / "chroma"
|
||||
self.manifest_path = self.root / "papers.json"
|
||||
self.collection_name = collection_name
|
||||
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
self.pdfs_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.chroma_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self._client = None # lazy
|
||||
self._collection = None # lazy
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Chroma — lazy because importing chromadb is slow
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def collection(self):
|
||||
"""Lazy chromadb collection handle."""
|
||||
if self._collection is None:
|
||||
import chromadb
|
||||
|
||||
self._client = chromadb.PersistentClient(path=str(self.chroma_dir))
|
||||
self._collection = self._client.get_or_create_collection(
|
||||
name=self.collection_name,
|
||||
# Cosine distance — typical for sentence-transformer
|
||||
# embeddings normalized to unit length.
|
||||
metadata={"hnsw:space": "cosine"},
|
||||
)
|
||||
return self._collection
|
||||
|
||||
def add_chunks(
|
||||
self,
|
||||
ids: list[str],
|
||||
documents: list[str],
|
||||
embeddings: list[list[float]],
|
||||
metadatas: list[dict[str, Any]],
|
||||
) -> None:
|
||||
"""Add a batch of embedded chunks to the collection."""
|
||||
if not ids:
|
||||
return
|
||||
if not (len(ids) == len(documents) == len(embeddings) == len(metadatas)):
|
||||
raise ValueError(
|
||||
"ids/documents/embeddings/metadatas must all have the same length"
|
||||
)
|
||||
self.collection.upsert(
|
||||
ids=ids,
|
||||
documents=documents,
|
||||
embeddings=embeddings,
|
||||
metadatas=metadatas,
|
||||
)
|
||||
|
||||
def chunk_count_for(self, arxiv_id: str) -> int:
|
||||
"""Number of chunks currently stored for one paper."""
|
||||
# chromadb's get() with a where filter returns the matching docs;
|
||||
# we just need the count.
|
||||
try:
|
||||
res = self.collection.get(where={"arxiv_id": arxiv_id})
|
||||
except Exception:
|
||||
return 0
|
||||
return len(res.get("ids", []))
|
||||
|
||||
def delete_paper(self, arxiv_id: str) -> int:
|
||||
"""Remove all chunks for one paper. Returns number deleted."""
|
||||
before = self.chunk_count_for(arxiv_id)
|
||||
if before == 0:
|
||||
return 0
|
||||
self.collection.delete(where={"arxiv_id": arxiv_id})
|
||||
return before
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Manifest — plain JSON, atomic write
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def load_manifest(self) -> dict[str, PaperRecord]:
|
||||
"""Read papers.json. Returns {} if missing."""
|
||||
if not self.manifest_path.exists():
|
||||
return {}
|
||||
data = json.loads(self.manifest_path.read_text(encoding="utf-8"))
|
||||
return {
|
||||
arxiv_id: PaperRecord.from_dict(arxiv_id, entry)
|
||||
for arxiv_id, entry in data.items()
|
||||
}
|
||||
|
||||
def save_manifest(self, manifest: dict[str, PaperRecord]) -> None:
|
||||
"""Write papers.json atomically."""
|
||||
payload = {arxiv_id: rec.to_dict() for arxiv_id, rec in manifest.items()}
|
||||
tmp = self.manifest_path.with_suffix(".json.tmp")
|
||||
tmp.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
||||
tmp.replace(self.manifest_path)
|
||||
|
||||
def upsert_paper(self, record: PaperRecord) -> None:
|
||||
"""Insert or replace one entry in the manifest."""
|
||||
manifest = self.load_manifest()
|
||||
manifest[record.arxiv_id] = record
|
||||
self.save_manifest(manifest)
|
||||
|
||||
def remove_paper(self, arxiv_id: str) -> bool:
|
||||
"""Drop one entry from the manifest. Returns True if removed."""
|
||||
manifest = self.load_manifest()
|
||||
if arxiv_id not in manifest:
|
||||
return False
|
||||
del manifest[arxiv_id]
|
||||
self.save_manifest(manifest)
|
||||
return True
|
||||
|
||||
def list_papers(self) -> list[PaperRecord]:
|
||||
"""All manifest entries, sorted by added_at descending (newest first)."""
|
||||
manifest = self.load_manifest()
|
||||
return sorted(manifest.values(), key=lambda r: r.added_at, reverse=True)
|
||||
|
||||
def get_paper(self, arxiv_id: str) -> Optional[PaperRecord]:
|
||||
"""Manifest entry for one paper, or None."""
|
||||
return self.load_manifest().get(arxiv_id)
|
||||
415
tests/test_arxiv_ingest.py
Normal file
415
tests/test_arxiv_ingest.py
Normal file
|
|
@ -0,0 +1,415 @@
|
|||
"""Tests for the arxiv-rag ingest pipeline (M5.1.1).
|
||||
|
||||
Strategy: mock the slow / network bits (arxiv API, embedder, chromadb)
|
||||
and exercise the real pipeline against synthetic PDFs generated with
|
||||
pymupdf at test time. This keeps the tests deterministic, fast, and
|
||||
network-free while still exercising the actual extract_sections logic.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from researchers.arxiv import ingest as ingest_mod
|
||||
from researchers.arxiv.ingest import (
|
||||
PaperMetadata,
|
||||
Section,
|
||||
embed_and_store,
|
||||
extract_sections,
|
||||
ingest,
|
||||
)
|
||||
from researchers.arxiv.store import ArxivStore, PaperRecord, make_chunk_id
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_synthetic_pdf(path: Path, sections: list[tuple[str, str]]) -> None:
|
||||
"""Build a tiny PDF with one section per (heading, body) tuple.
|
||||
|
||||
pymupdf is already a hard dep of the arxiv extra, so synthesizing a
|
||||
fixture PDF inline is cheaper than checking a binary into the repo.
|
||||
"""
|
||||
import pymupdf
|
||||
|
||||
doc = pymupdf.open()
|
||||
for heading, body in sections:
|
||||
page = doc.new_page()
|
||||
page.insert_text((50, 80), heading, fontsize=14)
|
||||
# Wrap body across a few lines for realism
|
||||
y = 110
|
||||
for line in body.split("\n"):
|
||||
page.insert_text((50, y), line, fontsize=11)
|
||||
y += 16
|
||||
doc.save(str(path))
|
||||
doc.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(tmp_path):
|
||||
"""ArxivStore rooted in a temp directory."""
|
||||
return ArxivStore(root=tmp_path / "arxiv-rag")
|
||||
|
||||
|
||||
class StubEmbedder:
|
||||
"""Minimal stand-in for sentence-transformers.SentenceTransformer."""
|
||||
|
||||
def __init__(self, dim: int = 4):
|
||||
self.dim = dim
|
||||
self.calls: list[list[str]] = []
|
||||
|
||||
def encode(self, texts):
|
||||
self.calls.append(list(texts))
|
||||
# Return deterministic vectors keyed off text length so two
|
||||
# different sections produce two different embeddings.
|
||||
return [[float(len(t)), 0.0, 0.0, 0.0] for t in texts]
|
||||
|
||||
|
||||
class StubChromaCollection:
|
||||
"""In-memory drop-in for a chromadb collection."""
|
||||
|
||||
def __init__(self):
|
||||
self.docs: dict[str, dict] = {}
|
||||
|
||||
def upsert(self, ids, documents, embeddings, metadatas):
|
||||
for i, doc, emb, meta in zip(ids, documents, embeddings, metadatas):
|
||||
self.docs[i] = {"document": doc, "embedding": emb, "metadata": meta}
|
||||
|
||||
def get(self, where=None):
|
||||
if where is None:
|
||||
ids = list(self.docs.keys())
|
||||
else:
|
||||
ids = [
|
||||
i
|
||||
for i, entry in self.docs.items()
|
||||
if all(entry["metadata"].get(k) == v for k, v in where.items())
|
||||
]
|
||||
return {"ids": ids}
|
||||
|
||||
def delete(self, where):
|
||||
to_drop = [
|
||||
i
|
||||
for i, entry in self.docs.items()
|
||||
if all(entry["metadata"].get(k) == v for k, v in where.items())
|
||||
]
|
||||
for i in to_drop:
|
||||
del self.docs[i]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stub_collection(monkeypatch):
|
||||
"""Replace ArxivStore.collection with an in-memory stub."""
|
||||
stub = StubChromaCollection()
|
||||
monkeypatch.setattr(
|
||||
ArxivStore, "collection", property(lambda self: stub)
|
||||
)
|
||||
return stub
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# extract_sections — real pymupdf, synthetic PDFs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractSections:
|
||||
def test_detects_canonical_headings(self, tmp_path):
|
||||
pdf = tmp_path / "paper.pdf"
|
||||
_make_synthetic_pdf(
|
||||
pdf,
|
||||
[
|
||||
("Introduction", "We study X. We find Y."),
|
||||
("Methods", "We used Z to evaluate Y."),
|
||||
("Results", "Accuracy was 95%."),
|
||||
("Conclusion", "X works."),
|
||||
],
|
||||
)
|
||||
sections = extract_sections(pdf)
|
||||
titles = [s.title.lower() for s in sections]
|
||||
assert "introduction" in titles
|
||||
assert "methods" in titles
|
||||
assert "results" in titles
|
||||
assert "conclusion" in titles
|
||||
# Body text from each section should be present
|
||||
intro = next(s for s in sections if s.title.lower() == "introduction")
|
||||
assert "we study x" in intro.text.lower()
|
||||
|
||||
def test_falls_back_to_whole_paper_when_no_headings(self, tmp_path):
|
||||
pdf = tmp_path / "no-headings.pdf"
|
||||
_make_synthetic_pdf(
|
||||
pdf,
|
||||
[
|
||||
("Some random title nobody recognizes", "Body text body text."),
|
||||
],
|
||||
)
|
||||
sections = extract_sections(pdf)
|
||||
assert len(sections) == 1
|
||||
assert sections[0].title == "Full Paper"
|
||||
assert "body text" in sections[0].text.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# embed_and_store — uses stub collection + stub embedder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEmbedAndStore:
|
||||
def test_writes_chunks_and_returns_count(self, store, stub_collection):
|
||||
sections = [
|
||||
Section(index=0, title="Intro", text="aaa", page_start=1, page_end=1),
|
||||
Section(index=1, title="Methods", text="bbbb", page_start=2, page_end=2),
|
||||
]
|
||||
meta = PaperMetadata(arxiv_id="2403.12345", version="v1", title="Test")
|
||||
n = embed_and_store(
|
||||
arxiv_id="2403.12345",
|
||||
sections=sections,
|
||||
store=store,
|
||||
model_name="stub-model",
|
||||
metadata=meta,
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
assert n == 2
|
||||
assert len(stub_collection.docs) == 2
|
||||
# Check that chunk ids are model-scoped
|
||||
expected_ids = {make_chunk_id("2403.12345", i, "stub-model") for i in (0, 1)}
|
||||
assert set(stub_collection.docs.keys()) == expected_ids
|
||||
# Metadata round-trips
|
||||
first = next(iter(stub_collection.docs.values()))
|
||||
assert first["metadata"]["arxiv_id"] == "2403.12345"
|
||||
assert first["metadata"]["embedding_model"] == "stub-model"
|
||||
|
||||
def test_re_embed_replaces_existing_chunks(self, store, stub_collection):
|
||||
meta = PaperMetadata(arxiv_id="2403.12345", version="v1", title="Test")
|
||||
sections_v1 = [
|
||||
Section(index=0, title="Intro", text="first", page_start=1, page_end=1),
|
||||
Section(index=1, title="Methods", text="second", page_start=2, page_end=2),
|
||||
]
|
||||
embed_and_store(
|
||||
"2403.12345", sections_v1, store, "stub-model", meta,
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
assert len(stub_collection.docs) == 2
|
||||
|
||||
# Re-embed with fewer sections — should drop the second.
|
||||
sections_v2 = [
|
||||
Section(index=0, title="Intro", text="first", page_start=1, page_end=1),
|
||||
]
|
||||
embed_and_store(
|
||||
"2403.12345", sections_v2, store, "stub-model", meta,
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
assert len(stub_collection.docs) == 1
|
||||
|
||||
def test_empty_sections_is_noop(self, store, stub_collection):
|
||||
meta = PaperMetadata(arxiv_id="x", version="", title="")
|
||||
n = embed_and_store("x", [], store, "stub-model", meta, embedder=StubEmbedder())
|
||||
assert n == 0
|
||||
assert stub_collection.docs == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Top-level ingest() — full pipeline with mocked download
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _stub_arxiv_search(arxiv_id: str):
|
||||
"""Return a fake arxiv.Search result for ``arxiv_id``."""
|
||||
|
||||
def _download_pdf(dirpath=None, filename=None):
|
||||
# Generate a synthetic PDF on the fly so the rest of the
|
||||
# pipeline has something real to read.
|
||||
target = Path(dirpath) / filename
|
||||
_make_synthetic_pdf(
|
||||
target,
|
||||
[
|
||||
("Introduction", "Stub paper introduction."),
|
||||
("Methods", "Stub paper methods."),
|
||||
("Results", "Stub paper results."),
|
||||
],
|
||||
)
|
||||
|
||||
paper = SimpleNamespace(
|
||||
entry_id=f"http://arxiv.org/abs/{arxiv_id}v1",
|
||||
title=f"Test paper {arxiv_id}",
|
||||
authors=[SimpleNamespace(name="Alice"), SimpleNamespace(name="Bob")],
|
||||
published=datetime(2024, 1, 15, tzinfo=timezone.utc),
|
||||
primary_category="cs.LG",
|
||||
download_pdf=_download_pdf,
|
||||
)
|
||||
return [paper]
|
||||
|
||||
|
||||
class TestIngest:
|
||||
def test_end_to_end(self, store, stub_collection):
|
||||
record = ingest(
|
||||
"2403.12345",
|
||||
store=store,
|
||||
model_name="stub-model",
|
||||
arxiv_search=_stub_arxiv_search,
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
|
||||
# Manifest entry
|
||||
assert isinstance(record, PaperRecord)
|
||||
assert record.arxiv_id == "2403.12345"
|
||||
assert record.title == "Test paper 2403.12345"
|
||||
assert record.authors == ["Alice", "Bob"]
|
||||
assert record.year == 2024
|
||||
assert record.category == "cs.LG"
|
||||
assert record.chunks_indexed >= 1
|
||||
assert record.embedding_model == "stub-model"
|
||||
|
||||
# Manifest persisted to disk
|
||||
loaded = store.load_manifest()
|
||||
assert "2403.12345" in loaded
|
||||
assert loaded["2403.12345"].chunks_indexed == record.chunks_indexed
|
||||
|
||||
# PDF cached
|
||||
assert (store.pdfs_dir / "2403.12345.pdf").exists()
|
||||
|
||||
# Chunks in stub collection
|
||||
assert len(stub_collection.docs) == record.chunks_indexed
|
||||
|
||||
def test_idempotent_reingest(self, store, stub_collection):
|
||||
first = ingest(
|
||||
"2403.12345",
|
||||
store=store,
|
||||
model_name="stub-model",
|
||||
arxiv_search=_stub_arxiv_search,
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
chunks_after_first = len(stub_collection.docs)
|
||||
|
||||
second = ingest(
|
||||
"2403.12345",
|
||||
store=store,
|
||||
model_name="stub-model",
|
||||
arxiv_search=_stub_arxiv_search,
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
# Same number of chunks (replace, not append)
|
||||
assert len(stub_collection.docs) == chunks_after_first
|
||||
assert second.chunks_indexed == first.chunks_indexed
|
||||
|
||||
def test_unknown_arxiv_id_raises(self, store):
|
||||
with pytest.raises(ValueError, match="not found"):
|
||||
ingest(
|
||||
"9999.99999",
|
||||
store=store,
|
||||
model_name="stub-model",
|
||||
arxiv_search=lambda _id: [],
|
||||
embedder=StubEmbedder(),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Manifest CRUD via ArxivStore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestManifest:
|
||||
def test_load_returns_empty_dict_when_missing(self, store):
|
||||
assert store.load_manifest() == {}
|
||||
|
||||
def test_round_trip(self, store):
|
||||
rec = PaperRecord(
|
||||
arxiv_id="2401.00001",
|
||||
version="v2",
|
||||
title="Round trip test",
|
||||
authors=["A", "B"],
|
||||
year=2024,
|
||||
category="cs.AI",
|
||||
chunks_indexed=7,
|
||||
embedding_model="m",
|
||||
)
|
||||
store.upsert_paper(rec)
|
||||
loaded = store.load_manifest()
|
||||
assert "2401.00001" in loaded
|
||||
assert loaded["2401.00001"].title == "Round trip test"
|
||||
assert loaded["2401.00001"].chunks_indexed == 7
|
||||
|
||||
def test_remove_paper(self, store):
|
||||
rec = PaperRecord(
|
||||
arxiv_id="2401.00001",
|
||||
version="",
|
||||
title="t",
|
||||
authors=[],
|
||||
year=None,
|
||||
category=None,
|
||||
chunks_indexed=0,
|
||||
embedding_model="m",
|
||||
)
|
||||
store.upsert_paper(rec)
|
||||
assert store.remove_paper("2401.00001") is True
|
||||
assert store.load_manifest() == {}
|
||||
assert store.remove_paper("2401.00001") is False
|
||||
|
||||
def test_list_sorted_newest_first(self, store):
|
||||
old = PaperRecord(
|
||||
arxiv_id="old",
|
||||
version="",
|
||||
title="old",
|
||||
authors=[],
|
||||
year=None,
|
||||
category=None,
|
||||
chunks_indexed=0,
|
||||
embedding_model="m",
|
||||
added_at="2020-01-01T00:00:00Z",
|
||||
)
|
||||
new = PaperRecord(
|
||||
arxiv_id="new",
|
||||
version="",
|
||||
title="new",
|
||||
authors=[],
|
||||
year=None,
|
||||
category=None,
|
||||
chunks_indexed=0,
|
||||
embedding_model="m",
|
||||
added_at="2026-01-01T00:00:00Z",
|
||||
)
|
||||
store.upsert_paper(old)
|
||||
store.upsert_paper(new)
|
||||
listed = store.list_papers()
|
||||
assert [p.arxiv_id for p in listed] == ["new", "old"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI smoke (without actually calling chromadb)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestArxivCLI:
|
||||
def test_list_empty(self, tmp_path, monkeypatch):
|
||||
from click.testing import CliRunner
|
||||
|
||||
from cli.main import cli
|
||||
|
||||
monkeypatch.setattr(
|
||||
"researchers.arxiv.store.DEFAULT_ROOT",
|
||||
tmp_path / "arxiv-rag",
|
||||
)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(cli, ["arxiv", "list"])
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "No papers indexed" in result.output
|
||||
|
||||
def test_info_missing(self, tmp_path, monkeypatch):
|
||||
from click.testing import CliRunner
|
||||
|
||||
from cli.main import cli
|
||||
|
||||
monkeypatch.setattr(
|
||||
"researchers.arxiv.store.DEFAULT_ROOT",
|
||||
tmp_path / "arxiv-rag",
|
||||
)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(cli, ["arxiv", "info", "0000.00000"])
|
||||
assert result.exit_code == 1
|
||||
assert "Not indexed" in result.output
|
||||
Loading…
Reference in a new issue