5단계
증분 수집 · 중복 해소
25 분
증분 수집 · 중복 해소
매일 전량 크롤링은 낭비 · 차단 유발. 바뀐 것만 수집.
1. 체크포인트 — 마지막 성공 시점
CREATE TABLE crawl_checkpoints (
source VARCHAR PRIMARY KEY,
last_run_at TIMESTAMPTZ,
last_successful_id BIGINT
);
last = await db.fetchrow(
"SELECT last_run_at FROM crawl_checkpoints WHERE source = $1", "nps"
)
since = last["last_run_at"] if last else datetime(2000, 1, 1)
new_data = await fetch_since(since)
await db.execute(
"INSERT INTO crawl_checkpoints (source, last_run_at) VALUES ($1, now()) "
"ON CONFLICT (source) DO UPDATE SET last_run_at = EXCLUDED.last_run_at",
"nps"
)
2. 자연 키 UNIQUE
수집 데이터의 외부 ID (사업자번호 · 병원 ykiho · 문서 ID 등) 를 UNIQUE 컬럼.
CREATE TABLE companies (
id BIGSERIAL PRIMARY KEY,
business_no VARCHAR UNIQUE NOT NULL,
name VARCHAR NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now()
);
INSERT INTO companies (business_no, name)
VALUES ($1, $2)
ON CONFLICT (business_no) DO UPDATE SET name = EXCLUDED.name, updated_at = now();
- 중복 삽입 자동 방어
- 변경 감지 (이름 바뀌면
updated_at갱신)
3. content hash — 본문 변경 감지
텍스트 긴 경우 전체 비교는 비쌈. hash 로 빠르게.
import hashlib
def content_hash(text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()
new_hash = content_hash(article.body)
row = await db.fetchrow("SELECT content_hash FROM articles WHERE url = $1", url)
if row and row["content_hash"] == new_hash:
return # 변경 없음 skip
await db.execute(
"INSERT INTO articles (url, body, content_hash) VALUES ($1, $2, $3) "
"ON CONFLICT (url) DO UPDATE SET body = $2, content_hash = $3, updated_at = now()",
url, article.body, new_hash
)
문자 한 개 변경도 hash 다름. 정밀 감지.
4. API 의 ?since= 파라미터
많은 공공 API 가 증분 지원.
resp = await client.get("https://opendart.fss.or.kr/list", params={
"corp_code": "...",
"bgn_de": since.strftime("%Y%m%d"), # 시작일
"end_de": today.strftime("%Y%m%d"),
})
크롤링 자체 대신 API 가 증분 제공하면 그쪽.
5. 페이지네이션 — 안 끝나는 크롤
async def crawl_all_pages(source: str):
page = 1
while True:
items = await fetch_page(source, page)
if not items:
break
await save_batch(items)
page += 1
if page > 1000:
logger.warning("exceeded page limit", source=source)
break
실수로 무한 루프 방지 — 항상 상한.
6. 병렬 수집 + 중복 해소
여러 워커가 동시 크롤링 시 race condition.
# 방법 A — 배치 내 중복 제거
items = [...] # 크롤 결과
seen = set()
unique = []
for item in items:
if item.id not in seen:
seen.add(item.id)
unique.append(item)
# 방법 B — DB UNIQUE + ON CONFLICT 에 위임
# (동시 INSERT 여도 DB 가 해결)
7. 삭제 감지
원본에서 사라진 데이터 (예: 폐업한 업체) 처리.
ALTER TABLE companies ADD COLUMN last_seen_at TIMESTAMPTZ;
-- 수집 시 모두 last_seen_at 갱신
UPDATE companies SET last_seen_at = now() WHERE business_no = $1;
-- 한 달 이상 미관측 → 폐업 의심
SELECT * FROM companies WHERE last_seen_at < now() - interval '30 days';
즉시 DELETE 는 위험. soft flag (is_active) 로.
8. 백필 (backfill)
기존 데이터가 일부만 있을 때 전체 재수집.
# 플래그로 구분
async def crawl(mode: str = "incremental"):
if mode == "backfill":
since = datetime(2000, 1, 1)
else:
since = await get_last_checkpoint()
# ...
운영자가 CLI · 관리 UI 에서 mode=backfill 명시적 실행.
9. 자주 걸리는 자리
- 체크포인트 갱신 누락 — 다음 run 에서 전량 재수집
- UNIQUE 컬럼 누락 — 중복 행 수만 증가
- hash 만 비교 — 의미상 동일하지만 공백 차이로 hash 다름. 정규화 필요
- 폐업 detection 없이 soft-delete 안 하고 행 삭제 — 과거 이력 소실
10. 데이터 품질 검증
async def validate(row):
assert row.business_no and len(row.business_no) == 10
assert row.name and len(row.name) < 200
# 예상 범위 체크
수집 시 즉시 검증. 잘못된 데이터 저장 방지.
하고픈 말
"전량 매일" 은 1 년 후 차단당할 길. 증분 수집은 예의 + 성능 + 안정성 3 가지 모두 얻습니다. 첫 주에 체크포인트 · UNIQUE · hash 를 세팅하면 이후 관리 비용 급감.
Next
- 06-observability-alerts