Step 6
Step 6 — Data pipeline
30 min
Step 6 — Data pipeline
External → transform → DB. That's a pipeline. ETL is the standard pattern.
ETL — Extract, Transform, Load
[external API/CSV/HTML]
↓ Extract
[raw]
↓ Transform (clean, normalize)
[shaped rows]
↓ Load
[Postgres rows]
Three functions, three responsibilities — easy to test and reuse.
First pipeline
def extract():
return client.get("/items").json()
def transform(raw):
return [
{
"external_id": item["id"],
"name": item["name"].strip(),
"price": int(item["price"]),
"synced_at": datetime.now(tz=UTC),
}
for item in raw
if item.get("name")
]
def load(rows):
with get_conn() as conn, conn.cursor() as cur:
for row in rows:
cur.execute("""
INSERT INTO external_items (external_id, name, price, synced_at)
VALUES (%(external_id)s, %(name)s, %(price)s, %(synced_at)s)
ON CONFLICT (external_id) DO UPDATE
SET name = EXCLUDED.name,
price = EXCLUDED.price,
synced_at = EXCLUDED.synced_at
""", row)
def sync():
raw = extract()
rows = transform(raw)
load(rows)
return len(rows)
Partial-failure strategy
10,000 rows, 1 fails — roll all back? It depends.
- Small + strict: one transaction, fail-rollback-all
- Large + loose: 500-row batches, allow partial success
Combine with APScheduler
scheduler.add_job(sync, IntervalTrigger(minutes=10))
Idempotency (step 4) + ethics (step 5) all apply.
Try it
Pull JSONPlaceholder /posts and UPSERT into external_posts, every 10 minutes.
Next
Step 7 makes the service observable.