update: a framework to call main() according to crontab

This commit is contained in:
2026-04-11 03:23:50 -04:00
parent 15b3c7b3ea
commit b8f7b70851
3 changed files with 115 additions and 1 deletions
+8
View File
@@ -0,0 +1,8 @@
[general]
# crontab
crontab = "1,5 * * * * *"
[arxiv]
# The max days should we search arXiv
# Note: if we set this to 7, we will search up to 7 days, but no duplicate download
fetch_period = 7
+16 -1
View File
@@ -1,6 +1,21 @@
from __future__ import annotations
from datetime import datetime
from time import sleep
from src.utils import load_config, next_cron_match
def main():
print("Hello from arxiv-bot!")
# a place holder
print("Hello from arxiv-bot!")
if __name__ == "__main__":
config = load_config()
crontab = config["general"]["crontab"]
while True:
next_run = next_cron_match(crontab, datetime.now())
sleep(max((next_run - datetime.now()).total_seconds(), 0.0))
main()
+91
View File
@@ -0,0 +1,91 @@
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import tomllib
def load_config(config_path: str | Path = "config.toml") -> dict[str, Any]:
"""Load a TOML config file and return its contents as a dictionary."""
path = Path(config_path)
if not path.is_absolute():
path = Path(__file__).resolve().parent.parent / path
if not path.is_file():
raise FileNotFoundError(f"Config file not found: {path}")
with path.open("rb") as f:
return tomllib.load(f)
def parse_cron_field(field: str, minimum: int, maximum: int) -> set[int]:
if field == "*":
return set(range(minimum, maximum + 1))
values: set[int] = set()
for part in field.split(","):
if "/" in part:
base, step_text = part.split("/", 1)
step = int(step_text)
if base == "*":
start, end = minimum, maximum
elif "-" in base:
start_text, end_text = base.split("-", 1)
start, end = int(start_text), int(end_text)
else:
start = end = int(base)
values.update(range(start, end + 1, step))
continue
if "-" in part:
start_text, end_text = part.split("-", 1)
values.update(range(int(start_text), int(end_text) + 1))
continue
values.add(int(part))
return {value for value in values if minimum <= value <= maximum}
def cron_matches(expr: str, dt: datetime) -> bool:
fields = expr.split()
if len(fields) == 5:
second = "0"
minute, hour, day, month, weekday = fields
elif len(fields) == 6:
second, minute, hour, day, month, weekday = fields
else:
raise ValueError("crontab must contain 5 or 6 fields")
cron_weekday = (dt.weekday() + 1) % 7
return (
dt.second in parse_cron_field(second, 0, 59)
and dt.minute in parse_cron_field(minute, 0, 59)
and dt.hour in parse_cron_field(hour, 0, 23)
and dt.day in parse_cron_field(day, 1, 31)
and dt.month in parse_cron_field(month, 1, 12)
and cron_weekday in parse_cron_field(weekday, 0, 6)
)
def next_cron_match(expr: str, after: datetime) -> datetime:
fields = expr.split()
if len(fields) == 5:
step = timedelta(minutes=1)
candidate = after.replace(second=0, microsecond=0) + step
elif len(fields) == 6:
step = timedelta(seconds=1)
candidate = after.replace(microsecond=0) + step
else:
raise ValueError("crontab must contain 5 or 6 fields")
deadline = candidate + timedelta(days=366)
while candidate <= deadline:
if cron_matches(expr, candidate):
return candidate
candidate += step
raise ValueError(f"Unable to find next run time for crontab: {expr}")