Python
Backend frameworks, decorators, async, type hints, and Python's role as the bridge between software engineering and data
Python
Python occupies a unique position: it is both a serious backend language (FastAPI, Django) and the dominant language for data engineering, machine learning, and scripting. A senior engineer who knows Python well can move fluidly between building APIs, writing data pipelines, and automating infrastructure. That versatility is Python's superpower.
The trade-off is performance. Python is slow compared to Go, Java, or Rust. The GIL limits true parallelism. But for the vast majority of backend services, the bottleneck is I/O (database, network), not CPU -- and Python handles I/O-bound work well, especially with async.
Backend Frameworks
FastAPI
FastAPI is the modern choice for Python APIs. It is built on type hints and delivers automatic validation, serialization, and OpenAPI documentation:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from datetime import datetime
app = FastAPI()
class UserCreate(BaseModel):
email: EmailStr
name: str
age: int
class UserResponse(BaseModel):
id: str
email: str
name: str
created_at: datetime
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate, db=Depends(get_db)):
if await db.users.find_one({"email": user.email}):
raise HTTPException(status_code=409, detail="Email already registered")
result = await db.users.insert_one(user.model_dump())
return UserResponse(
id=str(result.inserted_id),
email=user.email,
name=user.name,
created_at=datetime.utcnow(),
)Dependency injection is central to FastAPI. Dependencies are declared as function parameters and resolved automatically:
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
yield session
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(status_code=401)
return user
@app.get("/me")
async def read_profile(user: User = Depends(get_current_user)):
return userDjango
Django remains the right choice for content-heavy applications, admin interfaces, and teams that want batteries-included:
# models.py
from django.db import models
class Article(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(unique=True)
content = models.TextField()
published_at = models.DateTimeField(null=True, blank=True)
author = models.ForeignKey("auth.User", on_delete=models.CASCADE)
class Meta:
ordering = ["-published_at"]
def is_published(self) -> bool:
return self.published_at is not NoneDjango middleware intercepts every request and response. It is the right place for cross-cutting concerns:
class RequestTimingMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start = time.perf_counter()
response = self.get_response(request)
duration = time.perf_counter() - start
response["X-Request-Duration"] = f"{duration:.4f}s"
return responseDecorators Deep Dive
Decorators are functions that wrap other functions. Understanding them unlocks metaprogramming in Python.
Basic Decorator Pattern
import functools
import time
from typing import Callable, TypeVar, ParamSpec
P = ParamSpec("P")
R = TypeVar("R")
def timer(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
@timer
def fetch_users(limit: int) -> list[dict]:
# ... database query
passDecorator with Parameters
def retry(max_attempts: int = 3, delay: float = 1.0):
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(delay * (2 ** attempt))
raise last_exception
return wrapper
return decorator
@retry(max_attempts=5, delay=0.5)
def call_external_api(url: str) -> dict:
# ... HTTP request that might fail
passContext Managers
Context managers ensure resources are properly acquired and released. The with statement is their interface.
from contextlib import contextmanager, asynccontextmanager
from typing import AsyncGenerator
@contextmanager
def database_transaction(connection):
tx = connection.begin()
try:
yield tx
tx.commit()
except Exception:
tx.rollback()
raise
# Usage
with database_transaction(conn) as tx:
tx.execute("INSERT INTO users ...")
tx.execute("INSERT INTO audit_log ...")
# Commits on success, rolls back on exception
# Async version
@asynccontextmanager
async def managed_connection(pool) -> AsyncGenerator:
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)Generators and Iterators
Generators produce values lazily, one at a time. They are essential for processing large datasets without loading everything into memory:
def read_large_file(path: str, chunk_size: int = 8192):
"""Read a file in chunks without loading it all into memory."""
with open(path, "rb") as f:
while chunk := f.read(chunk_size):
yield chunk
def batch(iterable, size: int):
"""Group items into fixed-size batches."""
batch = []
for item in iterable:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if batch:
yield batch
# Process millions of records in batches of 1000
for record_batch in batch(read_csv_rows("huge.csv"), 1000):
bulk_insert(record_batch)Dataclasses and Pydantic v2
Dataclasses for Internal Data
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class OrderItem:
product_id: str
quantity: int
unit_price: float
@property
def total(self) -> float:
return self.quantity * self.unit_price
@dataclass
class Order:
id: str
customer_id: str
items: list[OrderItem] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
@property
def total(self) -> float:
return sum(item.total for item in self.items)Pydantic v2 for External Data
Pydantic validates data at the boundary -- API inputs, config files, external service responses. V2 is a ground-up rewrite in Rust, significantly faster:
from pydantic import BaseModel, field_validator, model_validator
class Config(BaseModel):
database_url: str
redis_url: str
max_connections: int = 10
debug: bool = False
@field_validator("max_connections")
@classmethod
def validate_max_connections(cls, v: int) -> int:
if v < 1 or v > 100:
raise ValueError("max_connections must be between 1 and 100")
return v
@model_validator(mode="after")
def validate_debug_settings(self):
if self.debug and self.max_connections > 5:
raise ValueError("In debug mode, max_connections should be <= 5")
return selfType Hints
Python's type system is gradual -- you add types incrementally. Modern Python (3.10+) type hints are expressive:
from typing import TypeVar, Protocol, overload, runtime_checkable
# Protocol: structural subtyping (like Go interfaces)
@runtime_checkable
class Serializable(Protocol):
def to_dict(self) -> dict: ...
class User:
def to_dict(self) -> dict:
return {"name": self.name}
def serialize(obj: Serializable) -> str:
return json.dumps(obj.to_dict())
# User satisfies Serializable without inheriting from it
# Overload: different return types based on input
@overload
def fetch(id: str, many: Literal[False] = ...) -> User: ...
@overload
def fetch(id: str, many: Literal[True] = ...) -> list[User]: ...
def fetch(id: str, many: bool = False) -> User | list[User]:
if many:
return db.find_all(id)
return db.find_one(id)
# TypeVar with bounds
T = TypeVar("T", bound="BaseModel")
def create_and_save(model_cls: type[T], data: dict) -> T:
instance = model_cls(**data)
instance.save()
return instanceAsync/Await
Python's async model is single-threaded, event-loop-based, similar to Node.js. It excels at I/O-bound work:
import asyncio
import httpx
async def fetch_user_data(user_id: str) -> dict:
async with httpx.AsyncClient() as client:
# These run concurrently, not sequentially
profile, orders, preferences = await asyncio.gather(
client.get(f"/api/users/{user_id}/profile"),
client.get(f"/api/users/{user_id}/orders"),
client.get(f"/api/users/{user_id}/preferences"),
)
return {
"profile": profile.json(),
"orders": orders.json(),
"preferences": preferences.json(),
}
# TaskGroup (Python 3.11+) -- structured concurrency
async def fetch_with_error_handling(urls: list[str]) -> list[str]:
results = []
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(fetch_one(url))
# If any task raises, all others are cancelled
return results
# Semaphore for rate limiting
async def fetch_many(urls: list[str], max_concurrent: int = 10) -> list[str]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url: str) -> str:
async with semaphore:
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.text
return await asyncio.gather(*[fetch_one(url) for url in urls])The GIL and Multiprocessing
The Global Interpreter Lock (GIL) prevents true parallel execution of Python bytecode. For CPU-bound work, use multiprocessing:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing as mp
# CPU-bound: use processes (bypasses GIL)
def process_image(path: str) -> str:
# heavy computation
return f"processed:{path}"
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as pool:
results = list(pool.map(process_image, image_paths))
# I/O-bound: threads are fine (GIL is released during I/O)
def download(url: str) -> bytes:
return httpx.get(url).content
with ThreadPoolExecutor(max_workers=20) as pool:
results = list(pool.map(download, urls))Note: Python 3.13 introduced a free-threaded build (no-GIL mode) as an experimental feature. It is not yet production-ready, but signals the direction Python is heading.
Virtual Environments and Tooling
The Python packaging ecosystem has historically been painful. Modern tools have improved things significantly:
| Tool | Purpose | When to Use |
|---|---|---|
uv | Fast package installer and resolver | New projects -- it is dramatically faster than pip |
poetry | Dependency management with lockfile | Teams that need reproducible builds |
pip + venv | Built-in, always available | Simple projects, CI environments |
ruff | Linter and formatter (replaces flake8, black, isort) | Every project -- it is fast and comprehensive |
mypy / pyright | Static type checking | Any project with type hints |
Testing with pytest
import pytest
from unittest.mock import AsyncMock, patch
# Fixtures for dependency injection
@pytest.fixture
def db_session():
session = create_test_session()
yield session
session.rollback()
# Parametrized tests (like Go's table-driven tests)
@pytest.mark.parametrize("input,expected", [
("$100.00", 10000),
("$0.50", 50),
("-$25.00", -2500),
])
def test_parse_amount(input: str, expected: int):
assert parse_amount(input) == expected
# Async test
@pytest.mark.asyncio
async def test_fetch_user(db_session):
user = await create_user(db_session, name="Alice")
fetched = await get_user(db_session, user.id)
assert fetched.name == "Alice"
# Mocking external services
async def test_payment_processing():
with patch("app.services.stripe_client") as mock_stripe:
mock_stripe.charge = AsyncMock(return_value={"id": "ch_123"})
result = await process_payment(amount=1000, currency="nzd")
assert result.stripe_id == "ch_123"
mock_stripe.charge.assert_called_once()
# conftest.py -- shared fixtures across test modules
@pytest.fixture(scope="session")
def app():
return create_app(testing=True)
@pytest.fixture(autouse=True)
def reset_db(db_session):
yield
db_session.rollback()Python for Data Engineering
Python's unique strength is bridging the gap between software engineering and data work. The same language writes the API, the ETL pipeline, and the ML training script:
import polars as pl
# Polars for fast data processing (Rust-backed)
df = (
pl.scan_csv("transactions.csv")
.filter(pl.col("amount") > 100)
.group_by("merchant_id")
.agg([
pl.col("amount").sum().alias("total"),
pl.col("amount").count().alias("count"),
pl.col("amount").mean().alias("average"),
])
.sort("total", descending=True)
.collect()
)This bridge between backend engineering and data is why Python remains indispensable despite its performance limitations. In NZ, nearly every data team uses Python, and backend engineers who can also write data pipelines are highly valued.