6단계
6단계 — 데이터 파이프라인
30 분
6단계 — 데이터 파이프라인
"외부 → 가공 → DB" 흐름이 한 줄로 이어지면 그게 곧 파이프라인 이에요. ETL 패턴이 표준.
ETL — Extract, Transform, Load
[외부 API/CSV/HTML]
↓ Extract (받아오기)
[원본 raw 데이터]
↓ Transform (정제)
[정형 데이터 (스키마 일치)]
↓ Load (저장)
[PostgreSQL row]
각 단계를 함수 로 분리해 두면 테스트와 재사용이 쉬워져요.
첫 파이프라인
# services/external_sync_service.py
from utils.http_client import RateLimitedClient
from db.connection import get_conn
client = RateLimitedClient("https://api.example.com", requests_per_second=2)
def extract():
"""외부 API 에서 원본 가져오기"""
return client.get("/items").json()
def transform(raw: list[dict]) -> list[dict]:
"""필드 정제·타입 변환"""
return [
{
"external_id": item["id"],
"name": item["name"].strip(),
"price": int(item["price"]),
"synced_at": datetime.now(tz=UTC),
}
for item in raw
if item.get("name") # 빈 이름 필터
]
def load(rows: list[dict]):
"""DB UPSERT"""
with get_conn() as conn, conn.cursor() as cur:
for row in rows:
cur.execute("""
INSERT INTO external_items (external_id, name, price, synced_at)
VALUES (%(external_id)s, %(name)s, %(price)s, %(synced_at)s)
ON CONFLICT (external_id) DO UPDATE
SET name = EXCLUDED.name,
price = EXCLUDED.price,
synced_at = EXCLUDED.synced_at
""", row)
def sync():
"""전체 흐름"""
raw = extract()
rows = transform(raw)
load(rows)
return len(rows)
세 함수가 각자의 책임 만 가져요 — 테스트하기도 쉬워요.
부분 실패 대응
10,000건 동기화 중 1건 실패하면 9,999건도 롤백할까? 경우에 따라.
- 소량·강한 정합성: 전체 트랜잭션 → 1건 실패 시 모두 롤백
- 대량·약한 정합성: 배치 단위 (예: 500개씩) 트랜잭션 → 부분 성공 허용
BATCH = 500
def load(rows: list[dict]):
for i in range(0, len(rows), BATCH):
batch = rows[i:i + BATCH]
try:
with get_conn() as conn, conn.cursor() as cur:
# … batch INSERT
pass
except Exception as e:
logger.error(f"batch {i}~{i + BATCH} failed: {e}")
continue # 다음 배치 진행
APScheduler 와 결합
scheduler.add_job(sync, IntervalTrigger(minutes=10))
10분마다 자동으로 외부 → DB 동기화. 4단계의 멱등성 + 5단계의 윤리가 모두 적용됨.
직접 해 보기
JSONPlaceholder 의 /posts 를 가져와 external_posts 테이블에 UPSERT 하는 파이프라인을 만들어 보세요. 10분 간격으로 자동 실행.
더 깊이
다음 단계
마지막 7단계에서는 이 모든 게 살아 있는지 자동으로 확인하는 헬스체크와 관측을 배워요.