Steven's Knowledge

Python

Backend frameworks, decorators, async, type hints, and Python's role as the bridge between software engineering and data

Python

Python occupies a unique position: it is both a serious backend language (FastAPI, Django) and the dominant language for data engineering, machine learning, and scripting. A senior engineer who knows Python well can move fluidly between building APIs, writing data pipelines, and automating infrastructure. That versatility is Python's superpower.

The trade-off is performance. Python is slow compared to Go, Java, or Rust. The GIL limits true parallelism. But for the vast majority of backend services, the bottleneck is I/O (database, network), not CPU -- and Python handles I/O-bound work well, especially with async.

Backend Frameworks

FastAPI

FastAPI is the modern choice for Python APIs. It is built on type hints and delivers automatic validation, serialization, and OpenAPI documentation:

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from datetime import datetime

app = FastAPI()

class UserCreate(BaseModel):
    email: EmailStr
    name: str
    age: int

class UserResponse(BaseModel):
    id: str
    email: str
    name: str
    created_at: datetime

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate, db=Depends(get_db)):
    if await db.users.find_one({"email": user.email}):
        raise HTTPException(status_code=409, detail="Email already registered")

    result = await db.users.insert_one(user.model_dump())
    return UserResponse(
        id=str(result.inserted_id),
        email=user.email,
        name=user.name,
        created_at=datetime.utcnow(),
    )

Dependency injection is central to FastAPI. Dependencies are declared as function parameters and resolved automatically:

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        yield session

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    payload = decode_jwt(token)
    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(status_code=401)
    return user

@app.get("/me")
async def read_profile(user: User = Depends(get_current_user)):
    return user

Django

Django remains the right choice for content-heavy applications, admin interfaces, and teams that want batteries-included:

# models.py
from django.db import models

class Article(models.Model):
    title = models.CharField(max_length=200)
    slug = models.SlugField(unique=True)
    content = models.TextField()
    published_at = models.DateTimeField(null=True, blank=True)
    author = models.ForeignKey("auth.User", on_delete=models.CASCADE)

    class Meta:
        ordering = ["-published_at"]

    def is_published(self) -> bool:
        return self.published_at is not None

Django middleware intercepts every request and response. It is the right place for cross-cutting concerns:

class RequestTimingMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        start = time.perf_counter()
        response = self.get_response(request)
        duration = time.perf_counter() - start
        response["X-Request-Duration"] = f"{duration:.4f}s"
        return response

Decorators Deep Dive

Decorators are functions that wrap other functions. Understanding them unlocks metaprogramming in Python.

Basic Decorator Pattern

import functools
import time
from typing import Callable, TypeVar, ParamSpec

P = ParamSpec("P")
R = TypeVar("R")

def timer(func: Callable[P, R]) -> Callable[P, R]:
    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} took {elapsed:.4f}s")
        return result
    return wrapper

@timer
def fetch_users(limit: int) -> list[dict]:
    # ... database query
    pass

Decorator with Parameters

def retry(max_attempts: int = 3, delay: float = 1.0):
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        time.sleep(delay * (2 ** attempt))
            raise last_exception
        return wrapper
    return decorator

@retry(max_attempts=5, delay=0.5)
def call_external_api(url: str) -> dict:
    # ... HTTP request that might fail
    pass

Context Managers

Context managers ensure resources are properly acquired and released. The with statement is their interface.

from contextlib import contextmanager, asynccontextmanager
from typing import AsyncGenerator

@contextmanager
def database_transaction(connection):
    tx = connection.begin()
    try:
        yield tx
        tx.commit()
    except Exception:
        tx.rollback()
        raise

# Usage
with database_transaction(conn) as tx:
    tx.execute("INSERT INTO users ...")
    tx.execute("INSERT INTO audit_log ...")
    # Commits on success, rolls back on exception

# Async version
@asynccontextmanager
async def managed_connection(pool) -> AsyncGenerator:
    conn = await pool.acquire()
    try:
        yield conn
    finally:
        await pool.release(conn)

Generators and Iterators

Generators produce values lazily, one at a time. They are essential for processing large datasets without loading everything into memory:

def read_large_file(path: str, chunk_size: int = 8192):
    """Read a file in chunks without loading it all into memory."""
    with open(path, "rb") as f:
        while chunk := f.read(chunk_size):
            yield chunk

def batch(iterable, size: int):
    """Group items into fixed-size batches."""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == size:
            yield batch
            batch = []
    if batch:
        yield batch

# Process millions of records in batches of 1000
for record_batch in batch(read_csv_rows("huge.csv"), 1000):
    bulk_insert(record_batch)

Dataclasses and Pydantic v2

Dataclasses for Internal Data

from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class OrderItem:
    product_id: str
    quantity: int
    unit_price: float

    @property
    def total(self) -> float:
        return self.quantity * self.unit_price

@dataclass
class Order:
    id: str
    customer_id: str
    items: list[OrderItem] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.utcnow)

    @property
    def total(self) -> float:
        return sum(item.total for item in self.items)

Pydantic v2 for External Data

Pydantic validates data at the boundary -- API inputs, config files, external service responses. V2 is a ground-up rewrite in Rust, significantly faster:

from pydantic import BaseModel, field_validator, model_validator

class Config(BaseModel):
    database_url: str
    redis_url: str
    max_connections: int = 10
    debug: bool = False

    @field_validator("max_connections")
    @classmethod
    def validate_max_connections(cls, v: int) -> int:
        if v < 1 or v > 100:
            raise ValueError("max_connections must be between 1 and 100")
        return v

    @model_validator(mode="after")
    def validate_debug_settings(self):
        if self.debug and self.max_connections > 5:
            raise ValueError("In debug mode, max_connections should be <= 5")
        return self

Type Hints

Python's type system is gradual -- you add types incrementally. Modern Python (3.10+) type hints are expressive:

from typing import TypeVar, Protocol, overload, runtime_checkable

# Protocol: structural subtyping (like Go interfaces)
@runtime_checkable
class Serializable(Protocol):
    def to_dict(self) -> dict: ...

class User:
    def to_dict(self) -> dict:
        return {"name": self.name}

def serialize(obj: Serializable) -> str:
    return json.dumps(obj.to_dict())
# User satisfies Serializable without inheriting from it

# Overload: different return types based on input
@overload
def fetch(id: str, many: Literal[False] = ...) -> User: ...
@overload
def fetch(id: str, many: Literal[True] = ...) -> list[User]: ...

def fetch(id: str, many: bool = False) -> User | list[User]:
    if many:
        return db.find_all(id)
    return db.find_one(id)

# TypeVar with bounds
T = TypeVar("T", bound="BaseModel")

def create_and_save(model_cls: type[T], data: dict) -> T:
    instance = model_cls(**data)
    instance.save()
    return instance

Async/Await

Python's async model is single-threaded, event-loop-based, similar to Node.js. It excels at I/O-bound work:

import asyncio
import httpx

async def fetch_user_data(user_id: str) -> dict:
    async with httpx.AsyncClient() as client:
        # These run concurrently, not sequentially
        profile, orders, preferences = await asyncio.gather(
            client.get(f"/api/users/{user_id}/profile"),
            client.get(f"/api/users/{user_id}/orders"),
            client.get(f"/api/users/{user_id}/preferences"),
        )
    return {
        "profile": profile.json(),
        "orders": orders.json(),
        "preferences": preferences.json(),
    }

# TaskGroup (Python 3.11+) -- structured concurrency
async def fetch_with_error_handling(urls: list[str]) -> list[str]:
    results = []
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(fetch_one(url))
    # If any task raises, all others are cancelled
    return results

# Semaphore for rate limiting
async def fetch_many(urls: list[str], max_concurrent: int = 10) -> list[str]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(url: str) -> str:
        async with semaphore:
            async with httpx.AsyncClient() as client:
                resp = await client.get(url)
                return resp.text

    return await asyncio.gather(*[fetch_one(url) for url in urls])

The GIL and Multiprocessing

The Global Interpreter Lock (GIL) prevents true parallel execution of Python bytecode. For CPU-bound work, use multiprocessing:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing as mp

# CPU-bound: use processes (bypasses GIL)
def process_image(path: str) -> str:
    # heavy computation
    return f"processed:{path}"

with ProcessPoolExecutor(max_workers=mp.cpu_count()) as pool:
    results = list(pool.map(process_image, image_paths))

# I/O-bound: threads are fine (GIL is released during I/O)
def download(url: str) -> bytes:
    return httpx.get(url).content

with ThreadPoolExecutor(max_workers=20) as pool:
    results = list(pool.map(download, urls))

Note: Python 3.13 introduced a free-threaded build (no-GIL mode) as an experimental feature. It is not yet production-ready, but signals the direction Python is heading.

Virtual Environments and Tooling

The Python packaging ecosystem has historically been painful. Modern tools have improved things significantly:

ToolPurposeWhen to Use
uvFast package installer and resolverNew projects -- it is dramatically faster than pip
poetryDependency management with lockfileTeams that need reproducible builds
pip + venvBuilt-in, always availableSimple projects, CI environments
ruffLinter and formatter (replaces flake8, black, isort)Every project -- it is fast and comprehensive
mypy / pyrightStatic type checkingAny project with type hints

Testing with pytest

import pytest
from unittest.mock import AsyncMock, patch

# Fixtures for dependency injection
@pytest.fixture
def db_session():
    session = create_test_session()
    yield session
    session.rollback()

# Parametrized tests (like Go's table-driven tests)
@pytest.mark.parametrize("input,expected", [
    ("$100.00", 10000),
    ("$0.50", 50),
    ("-$25.00", -2500),
])
def test_parse_amount(input: str, expected: int):
    assert parse_amount(input) == expected

# Async test
@pytest.mark.asyncio
async def test_fetch_user(db_session):
    user = await create_user(db_session, name="Alice")
    fetched = await get_user(db_session, user.id)
    assert fetched.name == "Alice"

# Mocking external services
async def test_payment_processing():
    with patch("app.services.stripe_client") as mock_stripe:
        mock_stripe.charge = AsyncMock(return_value={"id": "ch_123"})
        result = await process_payment(amount=1000, currency="nzd")
        assert result.stripe_id == "ch_123"
        mock_stripe.charge.assert_called_once()

# conftest.py -- shared fixtures across test modules
@pytest.fixture(scope="session")
def app():
    return create_app(testing=True)

@pytest.fixture(autouse=True)
def reset_db(db_session):
    yield
    db_session.rollback()

Python for Data Engineering

Python's unique strength is bridging the gap between software engineering and data work. The same language writes the API, the ETL pipeline, and the ML training script:

import polars as pl

# Polars for fast data processing (Rust-backed)
df = (
    pl.scan_csv("transactions.csv")
    .filter(pl.col("amount") > 100)
    .group_by("merchant_id")
    .agg([
        pl.col("amount").sum().alias("total"),
        pl.col("amount").count().alias("count"),
        pl.col("amount").mean().alias("average"),
    ])
    .sort("total", descending=True)
    .collect()
)

This bridge between backend engineering and data is why Python remains indispensable despite its performance limitations. In NZ, nearly every data team uses Python, and backend engineers who can also write data pipelines are highly valued.

On this page