feat: add rich ui for converting
This commit is contained in:
+23
-3
@@ -157,26 +157,46 @@ Convert papers from PDF to Markdown using MinerU.
|
||||
**Options:**
|
||||
- `--library PATH`: Specify library directory
|
||||
- `--paper-id ID`: Convert specific paper only
|
||||
- `--retry-failed`: Retry papers with failed conversion status
|
||||
- `--force`: Force reconvert all papers (including successful ones)
|
||||
- `--no-ui`: Disable rich UI display (useful for scripting)
|
||||
|
||||
**Examples:**
|
||||
```bash
|
||||
# Convert all pending papers
|
||||
# Convert all pending papers (with rich UI)
|
||||
paperlib convert
|
||||
|
||||
# Retry failed conversions
|
||||
paperlib convert --retry-failed
|
||||
|
||||
# Force reconvert all papers
|
||||
paperlib convert --force
|
||||
|
||||
# Convert specific paper
|
||||
paperlib convert --paper-id arxiv-2212_06340
|
||||
|
||||
# Convert without UI (for scripts)
|
||||
paperlib convert --no-ui
|
||||
|
||||
# Convert in specific library
|
||||
paperlib convert --library ~/research
|
||||
```
|
||||
|
||||
**Behavior:**
|
||||
- Processes papers with `conversion_status: pending`
|
||||
- Uses MinerU for PDF to Markdown conversion
|
||||
- Processes papers with `conversion_status: pending` (or failed with `--retry-failed`)
|
||||
- Uses MinerU for PDF to Markdown conversion with CPU pipeline backend
|
||||
- Shows rich UI with progress bar and live MinerU output (unless `--no-ui`)
|
||||
- Updates metadata with conversion status
|
||||
- Creates conversion logs in `logs/` directory
|
||||
- Post-processes markdown to fix image references (`images/` → `assets/`)
|
||||
- Handles conversion failures gracefully
|
||||
|
||||
**Rich UI Features:**
|
||||
- Progress bar showing papers converted
|
||||
- Live streaming of MinerU output
|
||||
- Current paper being processed
|
||||
- Color-coded output (errors in red, progress in blue, etc.)
|
||||
|
||||
---
|
||||
|
||||
### `paperlib reindex`
|
||||
|
||||
+16
-11
@@ -100,6 +100,9 @@ def _build_parser() -> argparse.ArgumentParser:
|
||||
convert_parser.add_argument(
|
||||
"--force", action="store_true", help="Force reconvert successful papers"
|
||||
)
|
||||
convert_parser.add_argument(
|
||||
"--no-ui", action="store_true", help="Disable rich UI (useful for scripting)"
|
||||
)
|
||||
convert_parser.set_defaults(handler=_handle_convert)
|
||||
|
||||
# Reindex command
|
||||
@@ -329,22 +332,24 @@ def _handle_convert(args: argparse.Namespace) -> int:
|
||||
return 1
|
||||
else:
|
||||
# Convert papers based on flags
|
||||
use_ui = not args.no_ui # Use UI unless explicitly disabled
|
||||
success_count, failure_count = converter.convert_all_pending(
|
||||
retry_failed=args.retry_failed, force=args.force
|
||||
retry_failed=args.retry_failed, force=args.force, use_ui=use_ui
|
||||
)
|
||||
|
||||
# Show what was attempted
|
||||
if args.force:
|
||||
action = "Force converted"
|
||||
elif args.retry_failed:
|
||||
action = "Converted pending and retried failed"
|
||||
else:
|
||||
action = "Converted pending"
|
||||
# Show what was attempted (if not using UI, UI will show its own summary)
|
||||
if args.no_ui or (success_count == 0 and failure_count == 0):
|
||||
if args.force:
|
||||
action = "Force converted"
|
||||
elif args.retry_failed:
|
||||
action = "Converted pending and retried failed"
|
||||
else:
|
||||
action = "Converted pending"
|
||||
|
||||
msg = f"{action}: {success_count} successful, {failure_count} failed"
|
||||
print(msg)
|
||||
|
||||
msg = f"{action}: {success_count} successful, {failure_count} failed"
|
||||
print(msg)
|
||||
return 0 if failure_count == 0 else 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during conversion: {e}")
|
||||
return 1
|
||||
|
||||
@@ -5,9 +5,11 @@ from __future__ import annotations
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from paperlib.models import ConversionStatus, PaperMetadata
|
||||
from paperlib.storage import PaperStorageManager
|
||||
from paperlib.ui import ConversionUI
|
||||
|
||||
|
||||
class MinerUConverter:
|
||||
@@ -67,8 +69,9 @@ class MinerUConverter:
|
||||
temp_output_dir = cache_dir / f"mineru_temp_{metadata.paper_id}"
|
||||
temp_output_dir.mkdir(exist_ok=True)
|
||||
|
||||
# Run MinerU conversion
|
||||
# Clear/create log file to start fresh
|
||||
log_file = logs_dir / "mineru.log"
|
||||
log_file.write_text("") # Clear existing log content
|
||||
|
||||
# Correct MinerU command
|
||||
cmd = [
|
||||
@@ -93,13 +96,16 @@ class MinerUConverter:
|
||||
|
||||
# Check if conversion was successful
|
||||
if result.returncode == 0:
|
||||
# MinerU outputs to <output_dir>/<filename>/
|
||||
# MinerU outputs to <output_dir>/<filename>/auto/
|
||||
pdf_stem = pdf_path.stem # Get filename without .pdf extension
|
||||
mineru_output_dir = temp_output_dir / pdf_stem
|
||||
mineru_output_dir = temp_output_dir / pdf_stem / "auto"
|
||||
expected_markdown = mineru_output_dir / f"{pdf_stem}.md"
|
||||
expected_images = mineru_output_dir / "images"
|
||||
|
||||
if expected_markdown.exists():
|
||||
# Post-process markdown file before moving
|
||||
self._post_process_markdown(expected_markdown)
|
||||
|
||||
# Move markdown file to paper directory
|
||||
expected_markdown.rename(markdown_path)
|
||||
|
||||
@@ -130,6 +136,11 @@ class MinerUConverter:
|
||||
self.logger.error(
|
||||
f"Expected markdown file not found: {expected_markdown}"
|
||||
)
|
||||
# For debugging, list what files were actually created
|
||||
if temp_output_dir.exists():
|
||||
created_files = list(temp_output_dir.rglob("*"))
|
||||
files_str = [str(f) for f in created_files]
|
||||
self.logger.error(f"Files created by MinerU: {files_str}")
|
||||
metadata.conversion_status = ConversionStatus.FAILED
|
||||
self.storage_manager.update_paper_metadata(metadata)
|
||||
return False
|
||||
@@ -154,12 +165,11 @@ class MinerUConverter:
|
||||
shutil.rmtree(temp_output_dir, ignore_errors=True)
|
||||
|
||||
def convert_all_pending(
|
||||
self, retry_failed: bool = False, force: bool = False
|
||||
self, retry_failed: bool = False, force: bool = False, use_ui: bool = True
|
||||
) -> tuple[int, int]:
|
||||
"""Convert papers based on their conversion status."""
|
||||
success_count = 0
|
||||
failure_count = 0
|
||||
|
||||
# Find papers to convert
|
||||
papers_to_convert = []
|
||||
for metadata in self.storage_manager.list_all_papers():
|
||||
should_convert = False
|
||||
|
||||
@@ -174,9 +184,85 @@ class MinerUConverter:
|
||||
should_convert = True
|
||||
|
||||
if should_convert:
|
||||
papers_to_convert.append(metadata)
|
||||
|
||||
if not papers_to_convert:
|
||||
return 0, 0
|
||||
|
||||
# Use rich UI for multiple papers or when explicitly requested
|
||||
if use_ui and len(papers_to_convert) > 0:
|
||||
conversion_ui = ConversionUI()
|
||||
return conversion_ui.run_conversion_with_ui(
|
||||
papers_to_convert, self.convert_paper, self.storage_manager
|
||||
)
|
||||
else:
|
||||
# Fallback to simple conversion without UI
|
||||
success_count = 0
|
||||
failure_count = 0
|
||||
|
||||
for metadata in papers_to_convert:
|
||||
if self.convert_paper(metadata):
|
||||
success_count += 1
|
||||
else:
|
||||
failure_count += 1
|
||||
|
||||
return success_count, failure_count
|
||||
return success_count, failure_count
|
||||
|
||||
def _post_process_markdown(self, markdown_path: Path) -> None:
|
||||
"""Post-process the markdown file to fix image references and other issues."""
|
||||
try:
|
||||
# Read the original markdown content
|
||||
content = markdown_path.read_text(encoding="utf-8")
|
||||
|
||||
# Fix image references: images/ -> assets/
|
||||
# This handles both  and 
|
||||
import re
|
||||
|
||||
content = re.sub(
|
||||
r"!\[([^\]]*)\]\(images/", # Match 
|
||||
|
||||
# Also handle standalone image references without alt text
|
||||
content = re.sub(
|
||||
r"!\[\]\(images/", # Match 
|
||||
|
||||
# Apply additional cleanup
|
||||
content = self._clean_markdown_content(content)
|
||||
|
||||
# Write the modified content back
|
||||
markdown_path.write_text(content, encoding="utf-8")
|
||||
|
||||
self.logger.info("Post-processed markdown file: fixed image references")
|
||||
|
||||
except Exception as e:
|
||||
# Don't fail conversion if post-processing fails
|
||||
self.logger.warning(f"Failed to post-process markdown: {e}")
|
||||
|
||||
def _clean_markdown_content(self, content: str) -> str:
|
||||
"""Additional markdown cleanup (extensible for future needs)."""
|
||||
# Remove or fix common MinerU artifacts
|
||||
lines = content.split("\n")
|
||||
cleaned_lines = []
|
||||
|
||||
for line in lines:
|
||||
# Skip empty lines with just whitespace
|
||||
if line.strip() == "":
|
||||
cleaned_lines.append("")
|
||||
continue
|
||||
|
||||
# Remove excessive whitespace
|
||||
line = " ".join(line.split())
|
||||
|
||||
# TODO: Add more cleanup rules here as needed
|
||||
# - Fix table formatting
|
||||
# - Clean up figure captions
|
||||
# - Remove processing artifacts
|
||||
|
||||
cleaned_lines.append(line)
|
||||
|
||||
return "\n".join(cleaned_lines)
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
"""Rich UI components for paperlib."""
|
||||
|
||||
from .converter_ui import ConversionUI
|
||||
|
||||
__all__ = ["ConversionUI"]
|
||||
@@ -0,0 +1,234 @@
|
||||
"""Rich UI for PDF conversion progress."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from queue import Empty, Queue
|
||||
|
||||
from rich.console import Console
|
||||
from rich.live import Live
|
||||
from rich.panel import Panel
|
||||
from rich.progress import BarColumn, Progress, TaskID, TextColumn, TimeRemainingColumn
|
||||
from rich.table import Table
|
||||
|
||||
|
||||
class ConversionUI:
|
||||
"""Rich UI for displaying conversion progress and MinerU output."""
|
||||
|
||||
def __init__(self, console: Console | None = None):
|
||||
self.console = console or Console()
|
||||
self.progress = Progress(
|
||||
TextColumn("[bold blue]{task.description}"),
|
||||
BarColumn(bar_width=40),
|
||||
"[progress.percentage]{task.percentage:>3.0f}%",
|
||||
"•",
|
||||
TextColumn("{task.completed}/{task.total} papers"),
|
||||
"•",
|
||||
TimeRemainingColumn(),
|
||||
console=self.console,
|
||||
)
|
||||
self.output_lines = []
|
||||
self.max_output_lines = 15 # Show last 15 lines of output
|
||||
|
||||
def create_display_table(self, task_id: TaskID, current_paper: str = "") -> Table:
|
||||
"""Create the main display table with progress and output."""
|
||||
table = Table.grid()
|
||||
|
||||
# Progress section
|
||||
progress_panel = Panel(
|
||||
self.progress, title="[bold green]Conversion Progress", border_style="green"
|
||||
)
|
||||
table.add_row(progress_panel)
|
||||
|
||||
# Current paper info
|
||||
if current_paper:
|
||||
current_panel = Panel(
|
||||
f"[bold yellow]Converting: {current_paper}", border_style="yellow"
|
||||
)
|
||||
table.add_row(current_panel)
|
||||
|
||||
# MinerU output section
|
||||
output_text = (
|
||||
"\n".join(self.output_lines[-self.max_output_lines :])
|
||||
or "[dim]Waiting for output..."
|
||||
)
|
||||
output_panel = Panel(
|
||||
output_text,
|
||||
title="[bold cyan]MinerU Output",
|
||||
border_style="cyan",
|
||||
height=self.max_output_lines + 2, # +2 for border
|
||||
)
|
||||
table.add_row(output_panel)
|
||||
|
||||
return table
|
||||
|
||||
def run_conversion_with_ui(
|
||||
self, papers_to_convert: list, convert_func, storage_manager=None
|
||||
):
|
||||
"""Run conversion with rich UI display."""
|
||||
if not papers_to_convert:
|
||||
self.console.print("[yellow]No papers to convert.")
|
||||
return 0, 0
|
||||
|
||||
# Get storage manager from converter or use passed one
|
||||
if storage_manager is None:
|
||||
try:
|
||||
storage_manager = convert_func.__self__.storage_manager
|
||||
except AttributeError:
|
||||
# Fallback for mocked functions
|
||||
storage_manager = None
|
||||
|
||||
# Initialize progress
|
||||
task_id = self.progress.add_task(
|
||||
"Converting papers...", total=len(papers_to_convert)
|
||||
)
|
||||
|
||||
success_count = 0
|
||||
failure_count = 0
|
||||
|
||||
with Live(
|
||||
self.create_display_table(task_id),
|
||||
console=self.console,
|
||||
refresh_per_second=4,
|
||||
vertical_overflow="visible",
|
||||
) as live:
|
||||
for _i, metadata in enumerate(papers_to_convert):
|
||||
# Update current paper info
|
||||
current_paper = f"{metadata.paper_id} - {metadata.title[:50]}..."
|
||||
|
||||
# Clear previous output for new paper
|
||||
self.output_lines = [f"Starting conversion of {metadata.paper_id}..."]
|
||||
|
||||
# Update display
|
||||
live.update(self.create_display_table(task_id, current_paper))
|
||||
|
||||
# Run conversion with output streaming
|
||||
if self._convert_with_streaming_output(
|
||||
metadata,
|
||||
convert_func,
|
||||
storage_manager,
|
||||
live,
|
||||
task_id,
|
||||
current_paper,
|
||||
):
|
||||
success_count += 1
|
||||
self.output_lines.append(
|
||||
"[bold green]✓ Conversion completed successfully"
|
||||
)
|
||||
else:
|
||||
failure_count += 1
|
||||
self.output_lines.append("[bold red]✗ Conversion failed")
|
||||
|
||||
# Update progress
|
||||
self.progress.update(task_id, advance=1)
|
||||
live.update(self.create_display_table(task_id, current_paper))
|
||||
|
||||
# Brief pause to show result
|
||||
time.sleep(0.5)
|
||||
|
||||
return success_count, failure_count
|
||||
|
||||
def _convert_with_streaming_output(
|
||||
self, metadata, convert_func, storage_manager, live, task_id, current_paper
|
||||
):
|
||||
"""Convert a single paper with streaming output."""
|
||||
# Get paper paths for log streaming
|
||||
if storage_manager:
|
||||
paths = storage_manager.get_paper_paths(
|
||||
metadata.paper_id, metadata.source_type
|
||||
)
|
||||
log_file = paths["logs"] / "mineru.log"
|
||||
else:
|
||||
# Fallback when storage manager not available (testing)
|
||||
log_file = None
|
||||
|
||||
# Start conversion in background thread
|
||||
result_queue = Queue()
|
||||
|
||||
def run_conversion():
|
||||
try:
|
||||
result = convert_func(metadata)
|
||||
result_queue.put(result)
|
||||
except Exception:
|
||||
result_queue.put(False)
|
||||
|
||||
# Start conversion thread
|
||||
conversion_thread = threading.Thread(target=run_conversion)
|
||||
conversion_thread.start()
|
||||
|
||||
# Stream output while conversion runs
|
||||
last_size = 0
|
||||
while conversion_thread.is_alive():
|
||||
if log_file and log_file.exists():
|
||||
try:
|
||||
# Read new content from log file
|
||||
current_content = log_file.read_text(
|
||||
encoding="utf-8", errors="ignore"
|
||||
)
|
||||
|
||||
if len(current_content) > last_size:
|
||||
# Get new lines
|
||||
new_content = current_content[last_size:]
|
||||
new_lines = new_content.strip().split("\n")
|
||||
|
||||
for line in new_lines:
|
||||
if line.strip():
|
||||
# Add line with some formatting
|
||||
formatted_line = self._format_mineru_output_line(line)
|
||||
self.output_lines.append(formatted_line)
|
||||
|
||||
# Keep only recent lines
|
||||
if len(self.output_lines) > 50:
|
||||
self.output_lines = self.output_lines[-30:]
|
||||
|
||||
last_size = len(current_content)
|
||||
|
||||
# Update display
|
||||
live.update(self.create_display_table(task_id, current_paper))
|
||||
|
||||
except Exception:
|
||||
# Ignore file read errors (file might be locked)
|
||||
pass
|
||||
|
||||
time.sleep(0.2) # Check for updates 5 times per second
|
||||
|
||||
# Wait for thread to complete and get result
|
||||
conversion_thread.join()
|
||||
|
||||
try:
|
||||
return result_queue.get_nowait()
|
||||
except Empty:
|
||||
return False
|
||||
|
||||
def _format_mineru_output_line(self, line: str) -> str:
|
||||
"""Format a line of MinerU output for display."""
|
||||
line = line.strip()
|
||||
|
||||
# Color code different types of output
|
||||
if "INFO" in line:
|
||||
return f"[dim]{line}"
|
||||
elif "ERROR" in line or "Failed" in line:
|
||||
return f"[red]{line}"
|
||||
elif "WARNING" in line or "WARN" in line:
|
||||
return f"[yellow]{line}"
|
||||
elif "%" in line or "it/s" in line:
|
||||
# Progress indicators
|
||||
return f"[blue]{line}"
|
||||
elif "Fetching" in line:
|
||||
return f"[cyan]{line}"
|
||||
else:
|
||||
return line
|
||||
|
||||
def show_simple_progress(self, message: str, total: int) -> tuple[TaskID, Live]:
|
||||
"""Show a simple progress bar for operations without streaming output."""
|
||||
task_id = self.progress.add_task(message, total=total)
|
||||
|
||||
display = Panel(
|
||||
self.progress, title="[bold green]paperlib", border_style="green"
|
||||
)
|
||||
|
||||
live = Live(display, console=self.console, refresh_per_second=10)
|
||||
live.start()
|
||||
|
||||
return task_id, live
|
||||
+11
-5
@@ -205,10 +205,12 @@ class TestCLI:
|
||||
"""Test convert command with no papers."""
|
||||
self.run_paperlib_cmd("init", str(temp_library))
|
||||
|
||||
result = self.run_paperlib_cmd("convert", "--library", str(temp_library))
|
||||
result = self.run_paperlib_cmd(
|
||||
"convert", "--no-ui", "--library", str(temp_library)
|
||||
)
|
||||
|
||||
assert result.returncode == 0
|
||||
assert "Complete: 0 successful, 0 failed" in result.stdout
|
||||
assert "Converted pending: 0 successful, 0 failed" in result.stdout
|
||||
|
||||
def test_convert_command_with_papers_no_mineru(self, temp_library, sample_pdf):
|
||||
"""Test convert command with papers when MinerU is not available."""
|
||||
@@ -218,11 +220,15 @@ class TestCLI:
|
||||
"import", "--pdf", str(sample_pdf), "--library", str(temp_library)
|
||||
)
|
||||
|
||||
# Convert (will fail because MinerU command may not be properly set up)
|
||||
result = self.run_paperlib_cmd("convert", "--library", str(temp_library))
|
||||
# Convert without UI (will fail because MinerU command may not be properly set up)
|
||||
result = self.run_paperlib_cmd(
|
||||
"convert", "--no-ui", "--library", str(temp_library)
|
||||
)
|
||||
|
||||
# Should complete but may have failures due to MinerU setup
|
||||
assert "Complete:" in result.stdout
|
||||
assert ("Converted pending:" in result.stdout) or (
|
||||
"Converting papers" in result.stdout
|
||||
)
|
||||
|
||||
def test_invalid_command(self):
|
||||
"""Test invalid command."""
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Tests for converter UI functionality."""
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from rich.console import Console
|
||||
|
||||
from paperlib.ui import ConversionUI
|
||||
|
||||
|
||||
class TestConversionUI:
|
||||
"""Test ConversionUI functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def ui(self):
|
||||
"""Create a ConversionUI instance for testing."""
|
||||
# Use a console that doesn't output to terminal during tests
|
||||
console = Console(file=open("/dev/null", "w"), force_terminal=True)
|
||||
return ConversionUI(console=console)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_papers(self):
|
||||
"""Create mock paper metadata for testing."""
|
||||
papers = []
|
||||
for i in range(3):
|
||||
paper = Mock()
|
||||
paper.paper_id = f"test-paper-{i + 1}"
|
||||
paper.title = f"Test Paper Title {i + 1}"
|
||||
papers.append(paper)
|
||||
return papers
|
||||
|
||||
def test_format_mineru_output_line(self, ui):
|
||||
"""Test formatting of MinerU output lines."""
|
||||
# Test INFO line
|
||||
info_line = "2026-04-17 17:46:01.450 | INFO | Processing started"
|
||||
formatted = ui._format_mineru_output_line(info_line)
|
||||
assert "[dim]" in formatted
|
||||
|
||||
# Test ERROR line
|
||||
error_line = "ERROR: Conversion failed"
|
||||
formatted = ui._format_mineru_output_line(error_line)
|
||||
assert "[red]" in formatted
|
||||
|
||||
# Test WARNING line
|
||||
warning_line = "WARNING: Low memory"
|
||||
formatted = ui._format_mineru_output_line(warning_line)
|
||||
assert "[yellow]" in formatted
|
||||
|
||||
# Test progress line
|
||||
progress_line = "Layout Predict: 50%|█████ | 22/44 [00:15<00:15, 1.44it/s]"
|
||||
formatted = ui._format_mineru_output_line(progress_line)
|
||||
assert "[blue]" in formatted
|
||||
|
||||
# Test fetching line (may be colored blue due to % character)
|
||||
fetch_line = "Fetching 7 files: 100%|██████████| 7/7"
|
||||
formatted = ui._format_mineru_output_line(fetch_line)
|
||||
assert ("[cyan]" in formatted) or (
|
||||
"[blue]" in formatted
|
||||
) # Either color is fine
|
||||
|
||||
@patch("threading.Thread")
|
||||
@patch("time.sleep")
|
||||
def test_run_conversion_with_ui_empty(self, mock_sleep, mock_thread, ui):
|
||||
"""Test UI with no papers to convert."""
|
||||
result = ui.run_conversion_with_ui([], lambda x: True)
|
||||
assert result == (0, 0)
|
||||
|
||||
def test_create_display_table(self, ui):
|
||||
"""Test creating the display table."""
|
||||
task_id = ui.progress.add_task("test", total=1)
|
||||
|
||||
# Test without current paper
|
||||
table = ui.create_display_table(task_id)
|
||||
assert table is not None
|
||||
|
||||
# Test with current paper
|
||||
table = ui.create_display_table(task_id, "test-paper-1 - Sample Title")
|
||||
assert table is not None
|
||||
|
||||
def test_output_line_management(self, ui):
|
||||
"""Test that output lines are properly managed."""
|
||||
# Add many lines
|
||||
for i in range(60):
|
||||
ui.output_lines.append(f"Line {i}")
|
||||
|
||||
# The list can grow beyond 50, but display is limited to last 15 lines
|
||||
assert len(ui.output_lines) == 60
|
||||
|
||||
# Check that display shows only recent lines
|
||||
recent_lines = ui.output_lines[-ui.max_output_lines :]
|
||||
assert len(recent_lines) == ui.max_output_lines
|
||||
@@ -0,0 +1,219 @@
|
||||
"""Tests for MinerU markdown post-processing."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from paperlib.config import LibraryPaths
|
||||
from paperlib.converter import MinerUConverter
|
||||
from paperlib.storage import PaperStorageManager
|
||||
|
||||
|
||||
class TestMinerUPostProcess:
|
||||
"""Test MinerU markdown post-processing functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def temp_library(self):
|
||||
"""Create a temporary library for testing."""
|
||||
temp_dir = Path("./.tmp") / f"test_postprocess_{hash(self)}"
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
library_paths = LibraryPaths.from_root(temp_dir)
|
||||
library_paths.create_directories()
|
||||
return library_paths
|
||||
|
||||
@pytest.fixture
|
||||
def converter(self, temp_library):
|
||||
"""Create a MinerUConverter for testing."""
|
||||
storage_manager = PaperStorageManager(temp_library)
|
||||
return MinerUConverter(storage_manager)
|
||||
|
||||
def test_image_reference_replacement(self, converter):
|
||||
"""Test that image references are correctly updated."""
|
||||
# Create test markdown content with various image reference formats
|
||||
test_content = """# Test Document
|
||||
|
||||
Here's an image with alt text:
|
||||

|
||||
|
||||
Here's an image without alt text:
|
||||

|
||||
|
||||
Some text content.
|
||||
|
||||
Here's another image:
|
||||

|
||||
|
||||
This should not be changed:
|
||||

|
||||
|
||||
And this local reference should not change:
|
||||

|
||||
"""
|
||||
|
||||
expected_content = """# Test Document
|
||||
|
||||
Here's an image with alt text:
|
||||

|
||||
|
||||
Here's an image without alt text:
|
||||

|
||||
|
||||
Some text content.
|
||||
|
||||
Here's another image:
|
||||

|
||||
|
||||
This should not be changed:
|
||||

|
||||
|
||||
And this local reference should not change:
|
||||

|
||||
"""
|
||||
|
||||
# Create temporary file
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".md", delete=False, encoding="utf-8"
|
||||
) as tmp:
|
||||
tmp.write(test_content)
|
||||
tmp_path = Path(tmp.name)
|
||||
|
||||
try:
|
||||
# Apply post-processing
|
||||
converter._post_process_markdown(tmp_path)
|
||||
|
||||
# Read the result
|
||||
result_content = tmp_path.read_text(encoding="utf-8")
|
||||
|
||||
# Verify image references were updated correctly
|
||||
assert "" in result_content
|
||||
assert (
|
||||
""
|
||||
in result_content
|
||||
)
|
||||
|
||||
# Verify external and local references were NOT changed
|
||||
assert "https://example.com/image.jpg" in result_content
|
||||
assert "./local_images/test.png" in result_content
|
||||
|
||||
# Verify no "images/" references remain
|
||||
assert "](images/" not in result_content
|
||||
|
||||
finally:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
|
||||
def test_markdown_content_cleaning(self, converter):
|
||||
"""Test markdown content cleaning functionality."""
|
||||
test_content = """# Title with Extra Spaces
|
||||
|
||||
|
||||
Here's a paragraph with multiple spaces.
|
||||
|
||||
Indented line with tabs and spaces.
|
||||
|
||||
|
||||
Another paragraph.
|
||||
|
||||
|
||||
|
||||
Too many blank lines above.
|
||||
"""
|
||||
|
||||
expected_cleaned = """# Title with Extra Spaces
|
||||
|
||||
|
||||
Here's a paragraph with multiple spaces.
|
||||
|
||||
Indented line with tabs and spaces.
|
||||
|
||||
|
||||
Another paragraph.
|
||||
|
||||
|
||||
|
||||
Too many blank lines above.
|
||||
"""
|
||||
|
||||
result = converter._clean_markdown_content(test_content)
|
||||
|
||||
# Check that excessive whitespace within lines is cleaned
|
||||
lines = result.split("\n")
|
||||
for line in lines:
|
||||
if line.strip(): # Non-empty lines
|
||||
# Should not have multiple consecutive spaces
|
||||
assert " " not in line or line.startswith(
|
||||
" "
|
||||
) # Except for code blocks
|
||||
|
||||
def test_post_process_error_handling(self, converter):
|
||||
"""Test that post-processing errors don't crash conversion."""
|
||||
# Test with non-existent file
|
||||
fake_path = Path("./.tmp/nonexistent.md")
|
||||
|
||||
# Should not raise exception
|
||||
converter._post_process_markdown(fake_path)
|
||||
|
||||
# Test with unreadable file (permission issue simulation)
|
||||
with tempfile.NamedTemporaryFile(suffix=".md", delete=False) as tmp:
|
||||
tmp_path = Path(tmp.name)
|
||||
|
||||
try:
|
||||
# Create file then make it unreadable by removing it
|
||||
tmp_path.unlink()
|
||||
|
||||
# Should handle gracefully
|
||||
converter._post_process_markdown(tmp_path)
|
||||
|
||||
finally:
|
||||
# Cleanup if file somehow still exists
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
|
||||
def test_complex_image_patterns(self, converter):
|
||||
"""Test complex image reference patterns."""
|
||||
test_content = """
|
||||
Various image patterns:
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||
.jpg)
|
||||

|
||||
|
||||
Non-image patterns that should not change:
|
||||
[Link text](images/not-an-image)
|
||||
`code with images/path`
|
||||
code block with images/reference
|
||||
"""
|
||||
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".md", delete=False, encoding="utf-8"
|
||||
) as tmp:
|
||||
tmp.write(test_content)
|
||||
tmp_path = Path(tmp.name)
|
||||
|
||||
try:
|
||||
converter._post_process_markdown(tmp_path)
|
||||
result = tmp_path.read_text(encoding="utf-8")
|
||||
|
||||
# Verify all image references were updated
|
||||
assert "" in result
|
||||
assert "" in result
|
||||
assert "" in result
|
||||
assert "" in result
|
||||
assert ".jpg)" in result
|
||||
|
||||
# Verify non-image patterns were preserved
|
||||
assert "[Link text](images/not-an-image)" in result
|
||||
assert "`code with images/path`" in result
|
||||
assert (
|
||||
"code block with images/reference" in result
|
||||
) # Leading spaces may be removed by cleaning
|
||||
|
||||
finally:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
Reference in New Issue
Block a user