Step 4
APScheduler + KST schedules
25 min
APScheduler + KST schedules
The standard Python scheduler — easier than system cron for in-repo management.
1. Install
uv add apscheduler
2. Basic async scheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
KST = pytz.timezone("Asia/Seoul")
scheduler = AsyncIOScheduler(timezone=KST)
@scheduler.scheduled_job(CronTrigger(hour=3, minute=0), id="daily-nps-crawl")
async def daily_crawl():
await crawl_nps()
scheduler.start()
3. Triggers
CronTrigger(hour=3, minute=0)
IntervalTrigger(hours=1)
DateTrigger(run_date=datetime(2026, 5, 10, 9, 0))
CronTrigger(day_of_week="mon,wed,fri", hour=3)
4. Idempotency options
JOB_DEFAULTS = {
"max_instances": 1,
"coalesce": True,
"misfire_grace_time": 300,
"replace_existing": True,
}
scheduler = AsyncIOScheduler(timezone=KST, job_defaults=JOB_DEFAULTS)
5. Wire to FastAPI lifespan
@asynccontextmanager
async def lifespan(app):
scheduler.start(); yield; scheduler.shutdown(wait=True)
app = FastAPI(lifespan=lifespan)
6. Manual trigger
@app.post("/admin/jobs/{job_id}/run")
async def trigger(job_id: str):
job = scheduler.get_job(job_id)
if not job: raise HTTPException(404)
job.modify(next_run_time=datetime.now(KST))
return {"ok": True}
7. Multi-instance — distributed lock
async def crawl_with_lock():
async with redis_lock("lock:daily-nps-crawl", ttl=3600):
await crawl_nps()
8. Persistent jobstore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
scheduler = AsyncIOScheduler(
jobstores={"default": SQLAlchemyJobStore(url="postgresql://...")},
timezone=KST,
)
Often overkill; decorator-based registration is simpler.
9. Record outcomes
@scheduler.scheduled_job(CronTrigger(hour=3), id="nps")
async def nps_job():
start = time.time()
try:
rows = await crawl_nps()
await db.execute(
"INSERT INTO crawl_runs (source, status, rows, duration_ms) VALUES ($1, $2, $3, $4)",
"nps", "ok", rows, int((time.time() - start) * 1000)
)
except Exception as e:
await db.execute(
"INSERT INTO crawl_runs (source, status, error) VALUES ($1, $2, $3)",
"nps", "fail", str(e))
raise
10. Gotchas
- Missing timezone → UTC surprises
- Default
misfire_grace_timetoo small - Multiple instances without a lock → duplicate runs
- Async jobs inside a sync scheduler
Closing
APScheduler + KST + the four idempotency options is the default set for Python backend scheduling.
Next
- 05-incremental-dedup