Step 5
Incremental collection · deduplication
25 min
Incremental collection · deduplication
Crawling everything every day wastes resources and invites bans. Crawl only what changed.
1. Checkpoint the last success
CREATE TABLE crawl_checkpoints (
source VARCHAR PRIMARY KEY,
last_run_at TIMESTAMPTZ,
last_successful_id BIGINT
);
last = await db.fetchrow(
"SELECT last_run_at FROM crawl_checkpoints WHERE source = $1", "nps"
)
since = last["last_run_at"] if last else datetime(2000, 1, 1)
new_data = await fetch_since(since)
await db.execute(
"INSERT INTO crawl_checkpoints (source, last_run_at) VALUES ($1, now()) "
"ON CONFLICT (source) DO UPDATE SET last_run_at = EXCLUDED.last_run_at",
"nps"
)
2. Natural-key UNIQUE
CREATE TABLE companies (
id BIGSERIAL PRIMARY KEY,
business_no VARCHAR UNIQUE NOT NULL,
name VARCHAR NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now()
);
INSERT INTO companies (business_no, name)
VALUES ($1, $2)
ON CONFLICT (business_no) DO UPDATE
SET name = EXCLUDED.name, updated_at = now();
3. Content hash
import hashlib
def content_hash(text): return hashlib.sha256(text.encode()).hexdigest()
new_hash = content_hash(article.body)
row = await db.fetchrow("SELECT content_hash FROM articles WHERE url = $1", url)
if row and row["content_hash"] == new_hash: return
await db.execute(
"INSERT INTO articles (url, body, content_hash) VALUES ($1, $2, $3) "
"ON CONFLICT (url) DO UPDATE SET body = $2, content_hash = $3, updated_at = now()",
url, article.body, new_hash
)
4. ?since= params
resp = await client.get("https://opendart.fss.or.kr/list", params={
"corp_code": "...",
"bgn_de": since.strftime("%Y%m%d"),
"end_de": today.strftime("%Y%m%d"),
})
5. Paginate with a cap
async def crawl_all_pages(source):
page = 1
while True:
items = await fetch_page(source, page)
if not items: break
await save_batch(items)
page += 1
if page > 1000:
logger.warning("exceeded page limit", source=source); break
6. Parallel + dedup
seen = set(); unique = []
for item in items:
if item.id not in seen:
seen.add(item.id); unique.append(item)
Or rely on DB UNIQUE + ON CONFLICT.
7. Delete detection
ALTER TABLE companies ADD COLUMN last_seen_at TIMESTAMPTZ;
UPDATE companies SET last_seen_at = now() WHERE business_no = $1;
SELECT * FROM companies WHERE last_seen_at < now() - interval '30 days';
Prefer soft-flags (is_active) over hard deletes.
8. Backfill
async def crawl(mode="incremental"):
since = datetime(2000, 1, 1) if mode == "backfill" else await get_last_checkpoint()
Explicit CLI/admin trigger for backfill.
9. Gotchas
- Missed checkpoint updates → re-crawl full set
- Missing UNIQUE → duplicate rows
- Hash diffs from whitespace — normalize
- Hard-deleting "gone" items — prefer soft-flag
10. Data quality
async def validate(row):
assert row.business_no and len(row.business_no) == 10
assert row.name and len(row.name) < 200
Validate at ingest; bad rows never reach storage.
Closing
Crawling everything daily is the path to being banned. Day-one checkpoints + UNIQUE + hashes cut maintenance later.
Next
- 06-observability-alerts