Skip to Content

Python Cheat Sheet

Modern Python (3.10+) reference for DevOps and platform engineers. Covers the language deeply, standard-library patterns, Azure SDK, and real automation workflows.

Versions: Python 3.10+ · uv 0.4+ · pytest 8.x+ · azure-identity 1.16+ · httpx 0.27+

Last reviewed: May 2026


Virtual Environments & Tooling

uv - the modern Python package manager ✅

uv is a drop-in replacement for pip, venv, pip-tools, and pyenv combined. Significantly faster than pip.

Bash
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
 
# Create a virtual environment
uv venv                          # creates .venv in the current directory
uv venv --python 3.12            # specific Python version
 
# Activate (same as venv)
source .venv/bin/activate        # Linux / macOS
.venv\Scripts\activate           # Windows
 
# Install packages
uv pip install requests azure-identity azure-mgmt-resource
 
# Install from requirements file
uv pip sync requirements.txt     # exact sync (removes extras)
uv pip install -r requirements.txt
 
# Add to pyproject.toml and install
uv add requests httpx
uv add --dev pytest pytest-asyncio ruff mypy
 
# Run without activating the venv
uv run python script.py
uv run pytest
 
# Lock and export
uv pip compile pyproject.toml -o requirements.txt

venv - standard library ⚠️ Legacy - prefer uv for new projects

Bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
deactivate

pyproject.toml - modern project config

TOML
[project]
name = "my-tool"
version = "1.0.0"
description = "Azure automation tooling"
requires-python = ">=3.10"
dependencies = [
    "azure-identity>=1.16",
    "azure-mgmt-resource>=23.0",
    "azure-keyvault-secrets>=4.8",
    "azure-storage-blob>=12.20",
    "httpx>=0.27",
    "pyyaml>=6.0",
    "typer>=0.12",
]
 
[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=0.23",
    "pytest-mock>=3.12",
    "ruff>=0.4",
    "mypy>=1.10",
]
 
[project.scripts]
my-tool = "my_tool.cli:app"
 
[tool.ruff]
line-length = 100
target-version = "py310"
 
[tool.ruff.lint]
select = ["E", "F", "I", "UP"]
 
[tool.mypy]
python_version = "3.10"
strict = true
 
[tool.pytest.ini_options]
asyncio_mode = "auto"

Shell helpers - mkvenv / avenv 🏠 Personal workflow helpers

Bash
function mkvenv() {
    local curdir; curdir=$(basename "$(pwd)")
    mkdir -p ~/.virtualenvs
    python3 -m venv ~/.virtualenvs/"${curdir}"
    echo "Created venv: ${curdir}"
}
 
function avenv() {
    local curdir; curdir=$(basename "$(pwd)")
    [[ -d ~/.virtualenvs/"${curdir}" ]] \
        && source ~/.virtualenvs/"${curdir}"/bin/activate \
        || echo "No venv found for: ${curdir}"
}

Core Language

F-strings (3.12 adds = debugging)

Python
name = "world"
count = 42
ratio = 1 / 3
 
f"Hello, {name}!"                   # "Hello, world!"
f"{count:04d}"                      # "0042"
f"{ratio:.2%}"                      # "33.33%"
f"{name!r}"                         # "'world'"  - repr()
f"{name!s}"                         # "world"    - str()
 
# Debug shorthand (3.8+)
f"{name=}"                          # "name='world'"
f"{count * 2 = }"                   # "count * 2 = 84"
 
# Multiline
query = (
    f"SELECT * FROM {table!r} "
    f"WHERE env = {env!r} "
    f"LIMIT {limit}"
)

Comprehensions

Python
numbers = [1, 2, 3, 4, 5, 6]
 
# List
squares      = [x**2 for x in numbers]
even_squares = [x**2 for x in numbers if x % 2 == 0]
 
# Dict
word_lengths = {word: len(word) for word in ["apple", "banana", "cherry"]}
 
# Set
unique_domains = {email.split("@")[1] for email in emails}
 
# Generator (lazy - use when you don't need to store all values)
total = sum(x**2 for x in range(1_000_000))
 
# Nested
matrix    = [[1, 2], [3, 4], [5, 6]]
flattened = [cell for row in matrix for cell in row]  # [1, 2, 3, 4, 5, 6]

Walrus operator := (3.8+)

Python
import re
 
# Avoid calling the function twice
if m := re.search(r"(\d+)", line):
    print(m.group(1))
 
# In a while loop
while chunk := file.read(8192):
    process(chunk)
 
# In a comprehension - keep the value without extra filtering pass
results = [y for x in data if (y := transform(x)) is not None]

match / case (3.10+)

Python
def handle_event(event: dict) -> str:
    match event:
        case {"type": "created", "resource": str(name)}:
            return f"Resource created: {name}"
 
        case {"type": "deleted", "resource": str(name), "soft_delete": True}:
            return f"Soft-deleted: {name} (recoverable)"
 
        case {"type": "error", "code": int(code)} if code >= 500:
            return f"Server error: {code}"
 
        case {"type": "error", "code": int(code)}:
            return f"Client error: {code}"
 
        case _:
            return "Unknown event"
 
 
# Matching types and structures
def describe(value):
    match value:
        case int() | float():           return "number"
        case str() as s if len(s) > 10: return "long string"
        case str():                     return "short string"
        case list() as lst:             return f"list of {len(lst)}"
        case None:                      return "nothing"

Type hints (modern style)

Python
from __future__ import annotations  # defer evaluation - use in 3.10 and earlier
from typing import Any, Callable, TypeVar, Generic, Protocol
 
# Built-in generics (3.9+ - no need to import from typing)
def process(items: list[str]) -> dict[str, int]:
    return {item: len(item) for item in items}
 
# Union - prefer X | Y over Optional[X] (3.10+)
def find(key: str) -> str | None: ...
 
# TypeVar for generic functions
T = TypeVar("T")
def first(items: list[T]) -> T | None:
    return items[0] if items else None
 
# Callable
Predicate = Callable[[str], bool]
Handler   = Callable[[dict[str, Any]], None]
 
# TypedDict - typed dictionary shape
from typing import TypedDict
from typing_extensions import NotRequired  # built-in typing.NotRequired requires Python 3.11+
 
class VMConfig(TypedDict):
    name: str
    size: str
    location: str
    tags: NotRequired[dict[str, str]]   # optional key
 
# Protocol - structural subtyping (duck typing with types)
class Closeable(Protocol):
    def close(self) -> None: ...
 
def cleanup(resource: Closeable) -> None:
    resource.close()

Unpacking & starred expressions

Python
first, *rest     = [1, 2, 3, 4, 5]    # first=1, rest=[2,3,4,5]
*head, last      = [1, 2, 3, 4, 5]    # head=[1,2,3,4], last=5
a, *_, z         = [1, 2, 3, 4, 5]    # a=1, z=5, middle discarded
 
# Merge dicts (3.9+)
defaults = {"retries": 3, "timeout": 30}
overrides = {"timeout": 60}
config = {**defaults, **overrides}     # {"retries": 3, "timeout": 60}
 
# Unpack in function call
args = ["UK South", "Premium_LRS"]
create_disk(*args)
 
kwargs = {"location": "UK South", "sku": "Premium_LRS"}
create_disk(**kwargs)

Data Structures

Dataclasses

Python
from dataclasses import dataclass, field, KW_ONLY
from typing import ClassVar
 
@dataclass
class AzureResource:
    name: str
    resource_group: str
    location: str
    tags: dict[str, str]  = field(default_factory=dict)
    _id: str              = field(default="", repr=False, compare=False)
 
    resource_type: ClassVar[str] = "Microsoft.Unknown/unknown"
 
    def __post_init__(self) -> None:
        # Normalise after init
        self.name     = self.name.lower().strip()
        self.location = self.location.replace(" ", "").lower()
 
    @property
    def resource_id(self) -> str:
        return self._id or f"(pending) {self.name}"
 
 
@dataclass(frozen=True)   # immutable, hashable - good for dict keys and sets
class Region:
    shorthand: str
    display_name: str
 
 
@dataclass(slots=True)    # uses __slots__ for lower memory, faster attribute access
class Metric:
    name: str
    value: float
    unit: str = "count"
 
 
@dataclass(kw_only=True)  # all fields must be passed as keywords
class VMConfig:
    name: str
    size: str
    os_disk_sku: str  = "Premium_LRS"
    data_disk_gb: int = 128

Enums

Python
from enum import Enum, StrEnum, auto
 
class Environment(StrEnum):        # StrEnum members compare equal to their string value
    PROD    = "prod"
    STAGING = "staging"
    DEV     = "dev"
 
class DiskSku(StrEnum):
    PREMIUM_LRS    = "Premium_LRS"
    STANDARD_LRS   = "Standard_LRS"
    ULTRA_SSD_LRS  = "UltraSSD_LRS"
 
class Severity(Enum):
    LOW    = auto()
    MEDIUM = auto()
    HIGH   = auto()
    CRITICAL = auto()
 
# Usage
env = Environment("prod")        # Environment.PROD
env == "prod"                    # True  - because StrEnum
f"Deploying to {env}"            # "Deploying to prod"
 
for sev in Severity:             # iterate all members
    print(sev.name, sev.value)

NamedTuple

Python
from typing import NamedTuple
 
class SubnetConfig(NamedTuple):
    name: str
    cidr: str
    service_endpoints: list[str] = []
 
app_subnet = SubnetConfig("app", "10.0.1.0/24", ["Microsoft.KeyVault"])
app_subnet.name        # "app"
app_subnet._asdict()   # {"name": "app", "cidr": "10.0.1.0/24", ...}

collections - Counter, defaultdict, deque

Python
from collections import Counter, defaultdict, deque
 
# Counter - frequency map
words    = ["a", "b", "a", "c", "b", "a"]
counts   = Counter(words)        # Counter({"a": 3, "b": 2, "c": 1})
counts.most_common(2)            # [("a", 3), ("b", 2)]
counts["a"]                      # 3
counts["z"]                      # 0 (no KeyError)
 
# defaultdict - missing key creates a default
groups: defaultdict[str, list[str]] = defaultdict(list)
for resource in resources:
    groups[resource.resource_group].append(resource.name)
 
# deque - efficient queue / stack
queue: deque[str] = deque(maxlen=100)  # fixed-size ring buffer
queue.appendleft("new")                # O(1) prepend
queue.popleft()                        # O(1) remove from front

Pathlib & File I/O

pathlib - path operations

Python
from pathlib import Path
 
base    = Path(__file__).parent         # directory of the current file
config  = base / "config" / "prod.yaml"
 
config.exists()                         # bool
config.is_file()                        # bool
config.suffix                           # ".yaml"
config.stem                             # "prod"
config.name                             # "prod.yaml"
config.parent                           # config/
 
# Glob
for tf_file in base.rglob("*.tf"):
    print(tf_file.relative_to(base))
 
# Read / write
text     = config.read_text(encoding="utf-8")
config.write_text("content", encoding="utf-8")
raw      = config.read_bytes()
 
# Create directories
(base / "output").mkdir(parents=True, exist_ok=True)
 
# List contents
children = list(base.iterdir())
tf_files = sorted(base.glob("*.tf"))
 
# Resolve symlinks and make absolute
absolute = config.resolve()

JSON

Python
import json
from pathlib import Path
 
# Load
data: dict = json.loads(text)
data        = json.load(open("config.json"))
data        = json.loads(Path("config.json").read_text())
 
# Dump
text = json.dumps(data, indent=2, sort_keys=True, default=str)
Path("output.json").write_text(json.dumps(data, indent=2))
 
# Pretty-print to stdout
print(json.dumps(data, indent=2))
 
# Handle dates and custom objects
from datetime import datetime
json.dumps({"ts": datetime.now()}, default=str)   # str() fallback

YAML

Python
import yaml
from pathlib import Path
 
# Load - always use safe_load, never load() (arbitrary code execution risk)
data = yaml.safe_load(Path("config.yaml").read_text())
docs = list(yaml.safe_load_all(text))   # multiple documents in one file
 
# Dump
Path("output.yaml").write_text(yaml.dump(data, default_flow_style=False, allow_unicode=True))
 
# Round-trip with ruamel.yaml (preserves comments and ordering)
from ruamel.yaml import YAML
yml = YAML()
yml.preserve_quotes = True
with open("config.yaml") as f:
    doc = yml.load(f)
doc["version"] = "2"
with open("config.yaml", "w") as f:
    yml.dump(doc, f)

TOML

Python
# Python 3.11+ - tomllib is in the standard library (read-only)
import tomllib
with open("pyproject.toml", "rb") as f:
    config = tomllib.load(f)
 
# Write TOML - use tomli-w (pip install tomli-w)
import tomli_w
Path("output.toml").write_bytes(tomli_w.dumps(config).encode())

Environment variables and .env files

Python
import os
from pathlib import Path
 
# Read with fallback
db_host = os.environ.get("DB_HOST", "localhost")
 
# Raise clearly if missing
subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]   # KeyError if absent
 
# Load .env file (python-dotenv)
from dotenv import load_dotenv
load_dotenv()                        # reads .env in cwd
load_dotenv(Path(__file__).parent / ".env.prod")
 
# Build a typed config object from env vars
from dataclasses import dataclass, field
 
@dataclass
class Config:
    subscription_id: str = field(default_factory=lambda: os.environ["AZURE_SUBSCRIPTION_ID"])
    tenant_id:       str = field(default_factory=lambda: os.environ["AZURE_TENANT_ID"])
    client_id:       str = field(default_factory=lambda: os.environ.get("AZURE_CLIENT_ID", ""))
    environment:     str = field(default_factory=lambda: os.environ.get("ENVIRONMENT", "dev"))
 
config = Config()

Subprocess & Shell

subprocess.run - execute commands

Python
import subprocess
import shlex
 
# Simple - raises on non-zero exit, captures output
result = subprocess.run(
    ["terraform", "plan", "-out", "tfplan.plan"],
    capture_output=True,
    text=True,
    check=True,         # raises CalledProcessError on failure
    cwd="/path/to/module",
    env={**os.environ, "TF_VAR_env": "prod"},
    timeout=300,
)
print(result.stdout)
 
# Accept a string - shlex.split handles quoting correctly
def run(cmd: str, **kwargs) -> subprocess.CompletedProcess:
    return subprocess.run(shlex.split(cmd), text=True, check=True, **kwargs)
 
# Suppress output
subprocess.run(["az", "login"], check=True, capture_output=True)
 
# Allow failure - check=False
result = subprocess.run(["git", "diff", "--exit-code"], capture_output=True)
has_changes = result.returncode != 0

Stream output in real time

Python
def stream(cmd: list[str], *, cwd: str | None = None) -> int:
    with subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,  # merge stderr into stdout
        text=True,
        cwd=cwd,
    ) as proc:
        for line in proc.stdout:
            print(line, end="", flush=True)
    return proc.returncode
 
exit_code = stream(["terraform", "apply", "-auto-approve", "tfplan.plan"])
if exit_code != 0:
    raise RuntimeError(f"Terraform apply failed with exit code {exit_code}")

Capture stdout and stderr separately

Python
try:
    result = subprocess.run(
        ["az", "group", "show", "--name", "rg-prod"],
        capture_output=True,
        text=True,
        check=True,
    )
    group = json.loads(result.stdout)
except subprocess.CalledProcessError as e:
    print(f"Command failed (exit {e.returncode}): {e.stderr.strip()}")
    raise

HTTP & APIs

requests session with retry and timeout

Python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
 
def make_session(
    *,
    retries: int   = 3,
    backoff: float = 0.5,
    timeout: int   = 30,
) -> requests.Session:
    session = requests.Session()
    retry_cfg = Retry(
        total=retries,
        backoff_factor=backoff,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods={"GET", "POST", "PUT", "PATCH", "DELETE"},
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry_cfg)
    session.mount("https://", adapter)
    session.mount("http://",  adapter)
    session.headers.update({"User-Agent": "my-tool/1.0"})
    return session
 
session = make_session()
r = session.get("https://management.azure.com/subscriptions", timeout=30)
r.raise_for_status()
data = r.json()

httpx - async and sync, modern API

Python
import httpx
 
# Sync
with httpx.Client(timeout=30, follow_redirects=True) as client:
    r = client.get("https://api.example.com/resources")
    r.raise_for_status()
    data = r.json()
 
# Async - concurrent requests
import asyncio
 
async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=30) as client:
        responses = await asyncio.gather(
            *[client.get(url) for url in urls],
            return_exceptions=True,
        )
        results = []
        for r in responses:
            if isinstance(r, Exception):
                print(f"Request failed: {r}")
            else:
                r.raise_for_status()
                results.append(r.json())
        return results
 
data = asyncio.run(fetch_all(["https://api.example.com/a", "https://api.example.com/b"]))

FastAPI - Modern API Framework

FastAPI is a modern, fast web framework for building APIs with Python 3.10+ async/await, with automatic OpenAPI (Swagger) docs and request validation via Pydantic.

Basic app with types and routes

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uvicorn
 
app = FastAPI(
    title="My API",
    version="1.0.0",
    description="DevOps automation API"
)
 
class Item(BaseModel):
    name: str
    price: float
    is_active: bool = True
 
class ItemUpdate(BaseModel):
    name: Optional[str] = None
    price: Optional[float] = None
    is_active: Optional[bool] = None
 
# GET - read all
@app.get("/items")
async def list_items():
    return [{"id": 1, "name": "Item 1"}]
 
# GET - read by path parameter
@app.get("/items/{item_id}")
async def read_item(item_id: int, skip: int = 0, limit: int = 10):
    return {"item_id": item_id, "skip": skip, "limit": limit}
 
# POST - create with request body
@app.post("/items", status_code=201)
async def create_item(item: Item):
    return {"created": item}
 
# PUT - full update
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
    return {"item_id": item_id, "item": item}
 
# PATCH - partial update
@app.patch("/items/{item_id}")
async def patch_item(item_id: int, item: ItemUpdate):
    return {"item_id": item_id, "updates": item}
 
# DELETE
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
    return {"deleted": item_id}
 
# Run with: uvicorn script:app --reload
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Response models and status codes

Python
from fastapi import FastAPI, status
from pydantic import BaseModel
 
app = FastAPI()
 
class Item(BaseModel):
    id: int
    name: str
 
class Error(BaseModel):
    detail: str
    code: int
 
@app.get(
    "/items/{item_id}",
    response_model=Item,
    status_code=status.HTTP_200_OK,
    responses={
        404: {"model": Error, "description": "Item not found"},
        500: {"model": Error, "description": "Internal error"}
    }
)
async def get_item(item_id: int):
    if item_id == 0:
        raise HTTPException(status_code=404, detail="Item not found")
    return {"id": item_id, "name": "Widget"}
 
# List with optional response
@app.get("/items", response_model=list[Item])
async def list_items():
    return [{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}]

Dependency injection

Python
from fastapi import FastAPI, Depends, HTTPException
from typing import Optional
 
app = FastAPI()
 
# Dependency - reusable logic
async def get_current_user(token: Optional[str] = None) -> dict:
    if not token:
        raise HTTPException(status_code=401, detail="Missing token")
    return {"username": "user", "token": token}
 
async def get_db_session():
    # Setup
    session = {"connection": "active"}
    yield session
    # Cleanup
    print("Closing connection")
 
# Use dependencies
@app.get("/protected")
async def protected_endpoint(current_user: dict = Depends(get_current_user)):
    return {"message": f"Hello {current_user['username']}"}
 
@app.get("/data")
async def get_data(
    user: dict = Depends(get_current_user),
    db: dict = Depends(get_db_session)
):
    return {"user": user, "db": db}

Middleware and error handling

Python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.middleware.cors import CORSMiddleware
import time
import logging
 
app = FastAPI()
 
logger = logging.getLogger(__name__)
 
# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://example.com", "http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# Custom middleware
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    process_time = time.time() - start
    response.headers["X-Process-Time"] = str(process_time)
    logger.info(f"{request.method} {request.url.path} - {process_time:.3f}s")
    return response
 
# Global exception handler
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
    return JSONResponse(
        status_code=400,
        content={"detail": str(exc)}
    )

Testing with TestClient

Python
from fastapi import FastAPI
from fastapi.testclient import TestClient
 
app = FastAPI()
 
@app.get("/items/{item_id}")
async def get_item(item_id: int):
    return {"item_id": item_id, "name": "Widget"}
 
client = TestClient(app)
 
def test_get_item():
    response = client.get("/items/42")
    assert response.status_code == 200
    assert response.json() == {"item_id": 42, "name": "Widget"}
 
def test_not_found():
    response = client.get("/items/invalid")
    assert response.status_code == 422  # validation error
 
if __name__ == "__main__":
    test_get_item()
    test_not_found()
    print("All tests passed")

Running with Uvicorn (production)

Bash
# Development (auto-reload)
uvicorn script:app --reload --host 0.0.0.0 --port 8000
 
# Production (4 workers)
uvicorn script:app --host 0.0.0.0 --port 8000 --workers 4
 
# With env vars
uvicorn script:app --env-file .env --host 0.0.0.0 --port ${PORT}
 
# With custom log level
uvicorn script:app --log-level debug --access-log

Concurrency

asyncio - event loop and tasks

Python
import asyncio
 
async def fetch(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        r = await client.get(url)
        r.raise_for_status()
        return r.json()
 
async def main() -> None:
    # Run concurrently - all start immediately
    results = await asyncio.gather(
        fetch("https://api.example.com/a"),
        fetch("https://api.example.com/b"),
        fetch("https://api.example.com/c"),
    )
 
    # With timeout
    try:
        result = await asyncio.wait_for(fetch("https://slow.api.com"), timeout=10.0)
    except asyncio.TimeoutError:
        print("Timed out")
 
    # Semaphore - limit concurrency (e.g., avoid rate-limit errors)
    sem = asyncio.Semaphore(10)
    async def bounded_fetch(url: str) -> dict:
        async with sem:
            return await fetch(url)
 
    tasks = [bounded_fetch(url) for url in urls]
    all_results = await asyncio.gather(*tasks, return_exceptions=True)
 
asyncio.run(main())

ThreadPoolExecutor - I/O-bound parallelism

Python
from concurrent.futures import ThreadPoolExecutor, as_completed
 
def call_api(endpoint: str) -> dict:
    return session.get(endpoint).json()
 
endpoints = [f"https://api.example.com/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=8) as pool:
    futures = {pool.submit(call_api, ep): ep for ep in endpoints}
    for future in as_completed(futures):
        endpoint = futures[future]
        try:
            result = future.result()
        except Exception as e:
            print(f"Failed {endpoint}: {e}")

ProcessPoolExecutor - CPU-bound parallelism

Python
from concurrent.futures import ProcessPoolExecutor
 
def cpu_heavy(data: bytes) -> str:
    import hashlib
    return hashlib.sha256(data).hexdigest()
 
blobs = [b"data1", b"data2", b"data3"]
with ProcessPoolExecutor() as pool:
    hashes = list(pool.map(cpu_heavy, blobs))

Error Handling & Context Managers

Custom exceptions

Python
class AzureToolError(Exception):
    """Base exception for this tool."""
 
class ResourceNotFoundError(AzureToolError):
    def __init__(self, resource_type: str, name: str) -> None:
        self.resource_type = resource_type
        self.name = name
        super().__init__(f"{resource_type} '{name}' not found")
 
class ConfigurationError(AzureToolError):
    """Raised when required config is missing or invalid."""
 
# Raise
raise ResourceNotFoundError("Key Vault", "kv-prod-001")
 
# Catch hierarchy
try:
    result = get_resource(name)
except ResourceNotFoundError as e:
    print(f"Missing: {e.resource_type}/{e.name}")
except AzureToolError as e:
    print(f"Tool error: {e}")
except Exception:
    raise   # re-raise anything unexpected

Context managers

Python
from contextlib import contextmanager, suppress, nullcontext
 
# Simple - suppress specific exceptions
with suppress(FileNotFoundError):
    Path("temp.json").unlink()
 
# Custom context manager as a generator
@contextmanager
def timer(label: str):
    import time
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"{label}: {elapsed:.2f}s")
 
with timer("terraform plan"):
    run("terraform plan -out tfplan.plan")
 
# Conditional context manager
def get_lock(enabled: bool):
    return acquire_lock() if enabled else nullcontext()
 
with get_lock(use_locking):
    do_work()

ExceptionGroup - multiple errors at once (3.11+)

Python
# Collect errors from concurrent operations
errors = []
for url in urls:
    try:
        process(url)
    except Exception as e:
        errors.append(e)
 
if errors:
    raise ExceptionGroup("batch processing failed", errors)
 
# Catch with except*
try:
    await asyncio.gather(*tasks)
except* httpx.HTTPStatusError as eg:
    for exc in eg.exceptions:
        print(f"HTTP error: {exc}")
except* TimeoutError as eg:
    print(f"{len(eg.exceptions)} requests timed out")

Functional Tools

functools

Python
from functools import lru_cache, cache, partial, reduce
 
# Memoisation - cache pure function results
@lru_cache(maxsize=128)
def get_region_display_name(shorthand: str) -> str:
    return REGION_MAP[shorthand]      # expensive lookup cached after first call
 
@cache                                # unbounded cache (3.9+)
def fibonacci(n: int) -> int:
    return n if n < 2 else fibonacci(n - 1) + fibonacci(n - 2)
 
# partial - fix some arguments of a function
from azure.mgmt.resource import ResourceManagementClient
list_in_rg = partial(client.resources.list_by_resource_group, resource_group_name="rg-prod")
all_resources = list(list_in_rg())
 
# reduce - fold a sequence
from functools import reduce
total_size = reduce(lambda acc, blob: acc + blob.size, blobs, 0)
 
# Cache with instance methods - use cached_property
from functools import cached_property
 
class AzureClient:
    @cached_property
    def resource_client(self) -> ResourceManagementClient:
        return ResourceManagementClient(self.credential, self.subscription_id)

itertools

Python
import itertools
 
# chain - flatten iterables
all_nics = list(itertools.chain(nic_list_a, nic_list_b, nic_list_c))
 
# chain.from_iterable - flatten one level
flat = list(itertools.chain.from_iterable(nested_list))
 
# islice - first N items from any iterable without materialising all
first_10 = list(itertools.islice(large_generator, 10))
 
# groupby - requires sorted input
sorted_resources = sorted(resources, key=lambda r: r.resource_group)
for rg, group in itertools.groupby(sorted_resources, key=lambda r: r.resource_group):
    print(f"{rg}: {list(group)}")
 
# product - cartesian product (same as Terraform's setproduct)
regions = ["UK South", "West Europe"]
envs    = ["prod", "staging"]
pairs   = list(itertools.product(regions, envs))
# [("UK South", "prod"), ("UK South", "staging"), ("West Europe", "prod"), ...]
 
# batched - process in chunks (3.12+)
for batch in itertools.batched(all_vms, 50):
    process_batch(batch)
 
# Pre-3.12 equivalent
def batched(iterable, n: int):
    it = iter(iterable)
    while chunk := list(itertools.islice(it, n)):
        yield chunk

Dependency Injection

Python needs no DI framework - the idiom is constructor injection: a class receives its collaborators instead of building them. Depend on a Protocol (structural typing), so the code is decoupled from concrete implementations and trivially testable with fakes.

Python
from collections.abc import Callable
from datetime import datetime
from typing import Protocol
 
# 1. Describe the dependency as a Protocol - depend on behaviour, not a concrete class.
#    Anything with these methods satisfies it; no inheritance required.
class BlobStore(Protocol):
    def get(self, key: str) -> bytes: ...
    def put(self, key: str, data: bytes) -> None: ...
 
# 2. Inject collaborators through __init__; the service never constructs them itself.
class ReportService:
    def __init__(self, store: BlobStore, clock: Callable[[], datetime]) -> None:
        self._store = store
        self._clock = clock
 
    def save(self, name: str, body: bytes) -> None:
        key = f"{self._clock():%Y/%m/%d}/{name}"
        self._store.put(key, body)

Composition root - wire the graph once

Construct concrete implementations in exactly one place (the entry point), then pass them down. Business code stays free of import azure... and global singletons.

Python
def main() -> None:
    store = AzureBlobStore(account="ldoprod")          # concrete choice lives here only
    service = ReportService(store=store, clock=datetime.now)
    service.save("daily.csv", b"...")

The payoff: testing with fakes, no patching

Python
# A hand-written fake beats mock.patch - it is type-checked and has no global state.
class FakeStore:
    def __init__(self) -> None:
        self.saved: dict[str, bytes] = {}
    def get(self, key: str) -> bytes:
        return self.saved[key]
    def put(self, key: str, data: bytes) -> None:
        self.saved[key] = data
 
def test_save_uses_a_date_prefix() -> None:
    store = FakeStore()
    service = ReportService(store=store, clock=lambda: datetime(2026, 5, 31))
    service.save("daily.csv", b"x")
    assert "2026/05/31/daily.csv" in store.saved   # deterministic clock, no real Azure

Functions: inject callables, not globals

Python
# Pass behaviour in as a parameter (with a sensible default) instead of reaching
# for a module-level singleton. Easy to override in a test or a different context.
def fetch(url: str, *, http_get: Callable[[str], bytes] = requests.get) -> bytes:
    return http_get(url)

FastAPI: Depends + overrides

FastAPI has DI built in (see the FastAPI section). Declare dependencies with Annotated[T, Depends(...)], and swap them for fakes in tests via app.dependency_overrides - no monkeypatching of routes.

Python
from typing import Annotated
from fastapi import Depends, FastAPI
 
def get_store() -> BlobStore:                          # provider
    return AzureBlobStore(account="ldoprod")
 
app = FastAPI()
 
@app.get("/report/{name}")
def read(name: str, store: Annotated[BlobStore, Depends(get_store)]) -> bytes:
    return store.get(name)
 
# In a test, replace the real provider without touching the route:
app.dependency_overrides[get_store] = lambda: FakeStore()

When to add a container: hand-wiring is the default and is enough for most services. Reach for a lightweight registry (svcs, punq, or dependency-injector) only when the object graph and its lifetimes (singletons, per-request scopes) become tedious to assemble by hand - never sooner.


Logging & Output

🛠️ Deeper reference - this section covers structured JSON logging, context propagation via contextvars, LoggerAdapter, timing decorators, and structlog. It’s a mini-guide, not a quick-reference snippet.

Standard logging setup

Python
import logging
import sys
 
def configure_logging(level: str = "INFO") -> None:
    logging.basicConfig(
        level=getattr(logging, level.upper()),
        format="%(asctime)s  %(levelname)-8s  %(name)s  %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%SZ",
        stream=sys.stdout,
        force=True,       # reconfigure even if already configured
    )
    # Quieten noisy libraries
    logging.getLogger("azure").setLevel(logging.WARNING)
    logging.getLogger("urllib3").setLevel(logging.WARNING)
 
logger = logging.getLogger(__name__)
logger.info("Deploying %s to %s", resource_name, location)
logger.warning("Soft-delete is disabled on %r", vault_name)
logger.exception("Unhandled error")   # logs traceback at ERROR level

Coloured console formatter

Python
import logging
 
class ColouredFormatter(logging.Formatter):
    FORMATS = {
        logging.DEBUG:    "\033[1;36m",   # cyan
        logging.INFO:     "\033[0;35m",   # purple
        logging.WARNING:  "\033[1;33m",   # yellow
        logging.ERROR:    "\033[1;31m",   # red
        logging.CRITICAL: "\033[1;31m",   # red bold
    }
    RESET = "\033[0m"
    FMT   = "%(asctime)s  %(levelname)-8s  %(message)s"
 
    def format(self, record: logging.LogRecord) -> str:
        colour = self.FORMATS.get(record.levelno, "")
        return logging.Formatter(colour + self.FMT + self.RESET).format(record)
 
handler = logging.StreamHandler()
handler.setFormatter(ColouredFormatter())
logging.getLogger().addHandler(handler)

rich - structured terminal output

Python
from rich.console import Console
from rich.table import Table
from rich.progress import track
from rich import print as rprint
 
console = Console()
 
# Styled output
console.print("[bold green]✓[/] Deployment complete")
console.print(f"[red]✗[/] Failed: [yellow]{error_msg}[/]")
console.print_exception()   # beautiful tracebacks
 
# Table
table = Table(title="Azure Resources")
table.add_column("Name",           style="cyan")
table.add_column("Resource Group", style="green")
table.add_column("Location")
table.add_column("Type",           style="dim")
 
for r in resources:
    table.add_row(r.name, r.resource_group, r.location, r.type)
 
console.print(table)
 
# Progress bar over any iterable
for resource in track(resources, description="Processing..."):
    process(resource)

Structured (JSON) logging - production default

Free-text logs are unparseable at scale. For anything running in containers, App Service, or a serverless function, emit one JSON object per line - any log shipper will pick it up. python-json-logger (pip install python-json-logger) is the no-fuss option; in Python 3.12+ the standard logging library can do it natively via a custom formatter.

Python
import logging
import sys
from pythonjsonlogger.json import JsonFormatter
 
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter(
    "%(asctime)s %(levelname)s %(name)s %(message)s",
    rename_fields={"asctime": "ts", "levelname": "level"},
    timestamp=True,
))
 
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers = [handler]
 
log = logging.getLogger(__name__)
log.info("deploy started", extra={"env": "prod", "resource_group": "rg-app"})
# {"ts": "...", "level": "INFO", "name": "myapp", "message": "deploy started", "env": "prod", "resource_group": "rg-app"}

The extra= dict promotes keys into the JSON record - prefer this to f"deploy {env}" so each field is queryable.

Context propagation - contextvars + filter

Adding correlation_id, user_id, or request_id to every log line in a request scope is a job for contextvars plus a logging filter. Works across async boundaries (unlike threading.local).

Python
import contextvars
import logging
 
_correlation_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
    "correlation_id", default=None
)
 
class ContextFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        record.correlation_id = _correlation_id.get() or "-"
        return True
 
logging.getLogger().addFilter(ContextFilter())
 
# In an HTTP middleware / message handler:
async def handle(request):
    token = _correlation_id.set(request.headers.get("x-correlation-id", "..."))
    try:
        log.info("handling request")   # every nested log gets correlation_id automatically
        await do_work()
    finally:
        _correlation_id.reset(token)

For the JSON formatter, just include %(correlation_id)s in the format string (or use a custom formatter that reads any attribute starting with _ctx_).

LoggerAdapter - bind context to a logger instance

When the context is known at construction time (e.g. one logger per tenant or per job), LoggerAdapter is simpler than contextvars.

Python
class JobLogger(logging.LoggerAdapter):
    def process(self, msg, kwargs):
        kwargs.setdefault("extra", {}).update(self.extra)
        return msg, kwargs
 
base_log = logging.getLogger("jobs")
job_log  = JobLogger(base_log, {"job_id": "abc123", "tenant": "acme"})
job_log.info("started")          # every line carries job_id + tenant

Logging decorator - timing and error context

For a clean cross-cutting timing/error pattern, wrap callables with a decorator. Works for sync, and a parallel wraps for async.

Python
import functools
import logging
import time
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar
 
log = logging.getLogger(__name__)
P = ParamSpec("P")
R = TypeVar("R")
 
def logged(level: int = logging.INFO) -> Callable[[Callable[P, R]], Callable[P, R]]:
    def decorator(fn: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(fn)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            start = time.perf_counter()
            log.log(level, "%s starting", fn.__qualname__)
            try:
                result = fn(*args, **kwargs)
                log.log(level, "%s ok in %.1fms", fn.__qualname__,
                        (time.perf_counter() - start) * 1000)
                return result
            except Exception:
                log.exception("%s failed after %.1fms", fn.__qualname__,
                              (time.perf_counter() - start) * 1000)
                raise
        return wrapper
    return decorator
 
# Async version
def logged_async(level: int = logging.INFO):
    def decorator(fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @functools.wraps(fn)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            start = time.perf_counter()
            log.log(level, "%s starting", fn.__qualname__)
            try:
                result = await fn(*args, **kwargs)
                log.log(level, "%s ok in %.1fms", fn.__qualname__,
                        (time.perf_counter() - start) * 1000)
                return result
            except Exception:
                log.exception("%s failed after %.1fms", fn.__qualname__,
                              (time.perf_counter() - start) * 1000)
                raise
        return wrapper
    return decorator
 
@logged()
def process_batch(items: list[str]) -> int: ...
 
@logged_async()
async def fetch_user(user_id: str) -> dict: ...

structlog - if you’d rather not hand-roll any of this

Python
import structlog
 
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,   # auto-pick contextvars
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
        structlog.processors.dict_tracebacks,
        structlog.processors.JSONRenderer(),
    ],
)
 
log = structlog.get_logger()
log.info("deploy_started", env="prod", resource_group="rg-app")
 
# Bind once, use many - returns a new logger with the context baked in
job_log = log.bind(job_id="abc123", tenant="acme")
job_log.info("started")
job_log.warning("retrying", attempt=2)

Log-level guidance

LevelUse for
DEBUGLocal diagnostics, payload dumps - off in prod
INFOBusiness events: deploy started, batch processed
WARNINGRecoverable issue, retry succeeded, fallback used
ERRORAn operation failed; caller will see a failure
CRITICALProcess is unhealthy / about to exit - alert on it

Common pitfalls

  • Don’t use print() for diagnostics. It bypasses levels, handlers, and formatters - and goes to stdout, mixing with real output.
  • Don’t f-string the message for production lines: log.info(f"user {uid} did {x}") builds the string even when the level is disabled. Use lazy formatting: log.info("user %s did %s", uid, x).
  • Don’t log secrets. Mask tokens/passwords/connection strings at the call site - the log backend is not a vault.
  • Use log.exception(...) in except: blocks - it captures the traceback automatically at ERROR level. log.error(str(e)) loses the stack.
  • Configure logging in one place (entrypoint or main()), not in library modules. Libraries should call logging.getLogger(__name__) and let the app decide handlers/levels.

Testing

🛠️ Deeper reference - this section covers project layout, fixtures, parametrize, Azure SDK mocking, monkeypatching, async tests, and temporary files. It’s a mini-guide, not a quick-reference snippet.

Project structure

TEXT
src/
  my_tool/
    __init__.py
    azure_client.py
    config.py
tests/
  conftest.py         # shared fixtures
  test_azure_client.py
  test_config.py

conftest.py - shared fixtures

Python
import pytest
from unittest.mock import MagicMock, patch
from my_tool.config import Config
 
@pytest.fixture
def config() -> Config:
    return Config(
        subscription_id="00000000-0000-0000-0000-000000000000",
        tenant_id="11111111-1111-1111-1111-111111111111",
        environment="test",
    )
 
@pytest.fixture
def mock_credential():
    with patch("azure.identity.DefaultAzureCredential") as mock:
        yield mock.return_value
 
@pytest.fixture
def mock_resource_client(mock_credential):
    with patch("azure.mgmt.resource.ResourceManagementClient") as mock:
        client = mock.return_value
        client.resource_groups.list.return_value = []
        yield client

Parametrize - data-driven tests

Python
import pytest
from my_tool.utils import lookup_region, validate_vm_name
 
@pytest.mark.parametrize("shorthand,expected", [
    ("uks",  "UK South"),
    ("euw",  "West Europe"),
    ("eus",  "East US"),
    ("eus2", "East US 2"),
])
def test_region_lookup(shorthand: str, expected: str) -> None:
    assert lookup_region(shorthand) == expected
 
@pytest.mark.parametrize("name,valid", [
    ("vm-web-prod-01", True),
    ("vm-web-prod",   True),
    ("VM Web Prod",   False),   # spaces not allowed
    ("a",             False),   # too short
    ("a" * 20,        False),   # too long
])
def test_vm_name_validation(name: str, valid: bool) -> None:
    assert validate_vm_name(name) == valid

Mocking Azure SDK calls

Python
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
 
def test_list_resource_groups(mock_resource_client, config):
    from my_tool.azure_client import AzureClient
 
    # Set up mock return value
    rg1 = MagicMock()
    rg1.name     = "rg-app-prod"
    rg1.location = "uksouth"
 
    mock_resource_client.resource_groups.list.return_value = [rg1]
 
    client = AzureClient(config)
    groups = client.list_resource_groups()
 
    assert len(groups) == 1
    assert groups[0].name == "rg-app-prod"
    mock_resource_client.resource_groups.list.assert_called_once()
 
 
def test_key_vault_secret(config):
    with patch("azure.keyvault.secrets.SecretClient") as MockKV:
        mock_secret       = MagicMock()
        mock_secret.value = "super-secret"
        MockKV.return_value.get_secret.return_value = mock_secret
 
        from my_tool.secrets import get_secret
        value = get_secret("https://kv-prod.vault.azure.net", "db-password")
 
        assert value == "super-secret"
        MockKV.return_value.get_secret.assert_called_once_with("db-password")

monkeypatch - modify environment and imports

Python
def test_config_from_env(monkeypatch):
    monkeypatch.setenv("AZURE_SUBSCRIPTION_ID", "test-sub")
    monkeypatch.setenv("AZURE_TENANT_ID",       "test-tenant")
    monkeypatch.setenv("ENVIRONMENT",           "test")
 
    config = Config()
    assert config.subscription_id == "test-sub"
    assert config.environment      == "test"
 
def test_missing_env_var_raises(monkeypatch):
    monkeypatch.delenv("AZURE_SUBSCRIPTION_ID", raising=False)
    with pytest.raises(KeyError):
        Config()

Async tests - pytest-asyncio

Python
import pytest
import httpx
from unittest.mock import AsyncMock, patch
 
@pytest.mark.asyncio
async def test_fetch_all():
    mock_response = MagicMock()
    mock_response.json.return_value = {"id": "1"}
    mock_response.raise_for_status  = MagicMock()
 
    with patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=mock_response):
        from my_tool.http import fetch_all
        results = await fetch_all(["https://api.example.com/a"])
        assert len(results) == 1

Temporary files and directories

Python
import pytest
from pathlib import Path
 
@pytest.fixture
def tmp_config(tmp_path: Path) -> Path:
    config = tmp_path / "config.yaml"
    config.write_text("environment: test\nlocation: UK South\n")
    return config
 
def test_load_config(tmp_config: Path) -> None:
    from my_tool.config import load_from_file
    cfg = load_from_file(tmp_config)
    assert cfg["environment"] == "test"

Azure Authentication

All credential types

Python
from azure.identity import (
    DefaultAzureCredential,
    ManagedIdentityCredential,
    WorkloadIdentityCredential,
    ClientSecretCredential,
    AzureCliCredential,
    ChainedTokenCredential,
    EnvironmentCredential,
)
 
# ✅ DefaultAzureCredential - tries: env vars → workload identity → managed identity → CLI
# Best choice for code that must run in both CI and locally (as of 2026)
credential = DefaultAzureCredential()
 
# System-assigned managed identity (no config needed on the VM/Function)
credential = ManagedIdentityCredential()
 
# User-assigned managed identity - must specify client_id
credential = ManagedIdentityCredential(client_id="your-uami-client-id")
 
# ✅ OIDC / Workload Identity Federation - GitHub Actions, Azure DevOps, etc.
# Current preferred CI/CD auth pattern (as of 2026)
# Reads AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_FEDERATED_TOKEN_FILE from environment
credential = WorkloadIdentityCredential()
 
# ⚠️ Service principal with client secret - legacy, avoid in new projects
credential = ClientSecretCredential(
    tenant_id=os.environ["AZURE_TENANT_ID"],
    client_id=os.environ["AZURE_CLIENT_ID"],
    client_secret=os.environ["AZURE_CLIENT_SECRET"],
)
 
# Explicit chain - try managed identity, fall back to CLI
credential = ChainedTokenCredential(
    ManagedIdentityCredential(),
    AzureCliCredential(),
)

Credential factory - pick based on environment

Python
import os
from azure.identity import (
    DefaultAzureCredential,
    ManagedIdentityCredential,
    ClientSecretCredential,
    AzureCliCredential,
    WorkloadIdentityCredential,
)
from azure.core.credentials import TokenCredential
 
def get_credential(env: str = "prod") -> TokenCredential:
    match env:
        case "prod" | "staging":
            # Managed identity in Azure-hosted environments
            client_id = os.environ.get("AZURE_CLIENT_ID")
            return ManagedIdentityCredential(client_id=client_id) if client_id \
                   else ManagedIdentityCredential()
        case "ci":
            return WorkloadIdentityCredential()
        case "dev":
            return AzureCliCredential()
        case _:
            return DefaultAzureCredential()

See also: Azure - Auth & Context for creating the service principals and managed identities used by DefaultAzureCredential.


Azure SDK Patterns

Resource groups

Python
from azure.mgmt.resource import ResourceManagementClient
from azure.identity import DefaultAzureCredential
import os
 
credential      = DefaultAzureCredential()
subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]
client          = ResourceManagementClient(credential, subscription_id)
 
# List all resource groups
groups = list(client.resource_groups.list())
 
# Filter by tag
prod_groups = [
    rg for rg in groups
    if rg.tags and rg.tags.get("Environment") == "prod"
]
 
# Get a specific resource group
rg = client.resource_groups.get("rg-app-prod")
print(rg.location, rg.tags)
 
# Create
client.resource_groups.create_or_update(
    "rg-new-prod",
    {"location": "uksouth", "tags": {"Environment": "prod", "ManagedBy": "Terraform"}},
)
 
# List all resources in a resource group
resources = list(client.resources.list_by_resource_group("rg-app-prod"))
vms = [r for r in resources if r.type == "Microsoft.Compute/virtualMachines"]

Key Vault - secrets

Python
from azure.keyvault.secrets import SecretClient
from azure.core.exceptions import ResourceNotFoundError
 
def make_kv_client(vault_url: str) -> SecretClient:
    return SecretClient(vault_url=vault_url, credential=DefaultAzureCredential())
 
# Get a secret value
def get_secret(vault_url: str, name: str, version: str | None = None) -> str:
    client = make_kv_client(vault_url)
    return client.get_secret(name, version=version).value
 
# Get with fallback
def get_secret_or_default(vault_url: str, name: str, default: str = "") -> str:
    try:
        return get_secret(vault_url, name)
    except ResourceNotFoundError:
        return default
 
# Set
def set_secret(vault_url: str, name: str, value: str, **properties) -> None:
    make_kv_client(vault_url).set_secret(name, value, **properties)
 
# List secret names
def list_secrets(vault_url: str) -> list[str]:
    return [s.name for s in make_kv_client(vault_url).list_properties_of_secrets()]
 
# Bulk load secrets into a dict
def load_secrets(vault_url: str, names: list[str]) -> dict[str, str]:
    client = make_kv_client(vault_url)
    return {name: client.get_secret(name).value for name in names}

Storage blobs

Python
from azure.storage.blob import (
    BlobServiceClient,
    BlobSasPermissions,
    generate_blob_sas,
)
from datetime import datetime, timedelta, timezone
 
def make_blob_client(account_name: str) -> BlobServiceClient:
    return BlobServiceClient(
        account_url=f"https://{account_name}.blob.core.windows.net",
        credential=DefaultAzureCredential(),
    )
 
client = make_blob_client("myaccount")
 
# Upload bytes or file-like object
def upload(container: str, name: str, data: bytes | str) -> None:
    if isinstance(data, str):
        data = data.encode()
    client.get_blob_client(container, name).upload_blob(data, overwrite=True)
 
# Upload a local file
def upload_file(container: str, blob_name: str, local_path: str) -> None:
    with open(local_path, "rb") as f:
        client.get_blob_client(container, blob_name).upload_blob(f, overwrite=True)
 
# Download
def download(container: str, name: str) -> bytes:
    return client.get_blob_client(container, name).download_blob().readall()
 
# List blobs with optional prefix
def list_blobs(container: str, prefix: str = "") -> list[str]:
    cc = client.get_container_client(container)
    return [b.name for b in cc.list_blobs(name_starts_with=prefix)]
 
# Generate a time-limited SAS URL (for unauthenticated sharing)
def sas_url(account_name: str, account_key: str, container: str, blob: str, hours: int = 1) -> str:
    token = generate_blob_sas(
        account_name=account_name,
        container_name=container,
        blob_name=blob,
        account_key=account_key,
        permission=BlobSasPermissions(read=True),
        expiry=datetime.now(timezone.utc) + timedelta(hours=hours),
    )
    return f"https://{account_name}.blob.core.windows.net/{container}/{blob}?{token}"

Azure Monitor - query metrics

Python
from azure.monitor.query import MetricsQueryClient, MetricAggregationType
from datetime import timedelta
 
metrics_client = MetricsQueryClient(DefaultAzureCredential())
 
response = metrics_client.query_resource(
    resource_id="/subscriptions/.../resourceGroups/rg-prod/providers/Microsoft.Compute/virtualMachines/vm-prod",
    metric_names=["Percentage CPU", "Available Memory Bytes"],
    timespan=timedelta(hours=1),
    granularity=timedelta(minutes=5),
    aggregations=[MetricAggregationType.AVERAGE, MetricAggregationType.MAXIMUM],
)
 
for metric in response.metrics:
    print(f"\n{metric.name}")
    for ts in metric.timeseries:
        for point in ts.data:
            if point.average is not None:
                print(f"  {point.timestamp}  avg={point.average:.1f}")

Azure Instance Metadata Service (from inside a VM)

Python
import json, requests, os
 
os.environ["NO_PROXY"] = "*"   # bypass any proxy for link-local
 
url  = "http://169.254.169.254/metadata/instance?api-version=2021-02-01"
data = requests.get(url, headers={"Metadata": "true"}, timeout=5).json()
 
compute = data["compute"]
print(f"Subscription: {compute['subscriptionId']}")
print(f"Resource group: {compute['resourceGroupName']}")
print(f"VM name: {compute['name']}")
print(f"Location: {compute['location']}")
print(f"VM size: {compute['vmSize']}")

Azure Function - timer trigger with managed identity

Python
import azure.functions as func
import logging
from datetime import datetime, timezone
from azure.identity import ManagedIdentityCredential
from azure.mgmt.resource import ResourceManagementClient
import os
 
app = func.FunctionApp()
 
@app.timer_trigger(schedule="0 */5 * * * *", arg_name="timer")
def resource_audit(timer: func.TimerRequest) -> None:
    credential = ManagedIdentityCredential(
        client_id=os.environ.get("AZURE_CLIENT_ID")
    )
    client = ResourceManagementClient(
        credential,
        os.environ["AZURE_SUBSCRIPTION_ID"],
    )
 
    rg_name = os.environ["RESOURCE_GROUP_NAME"]
    resources = list(client.resources.list_by_resource_group(rg_name))
 
    logging.info(
        "Audit at %s: found %d resources in %s",
        datetime.now(timezone.utc).isoformat(),
        len(resources),
        rg_name,
    )
 
    for r in resources:
        logging.info("  %s  %s", r.type, r.name)

CLI Tools

argparse - standard library

Python
import argparse
import sys
 
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Azure resource auditing tool.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument("--subscription-id", required=True,  help="Azure subscription ID")
    parser.add_argument("--resource-group",  required=True,  help="Resource group name")
    parser.add_argument("--output",          default="table", choices=["table", "json", "csv"])
    parser.add_argument("--verbose", "-v",   action="store_true")
    parser.add_argument("--dry-run",         action="store_true", help="Plan only, no changes")
    return parser.parse_args(argv)
 
def main() -> int:
    args = parse_args()
    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)
    run(args)
    return 0
 
if __name__ == "__main__":
    sys.exit(main())

Multiline help text with textwrap.dedent():

Python
import argparse
import textwrap
 
parser = argparse.ArgumentParser(
    description="Sort Terraform blocks and regenerate docs.",
    epilog=textwrap.dedent('''\
        Examples:
          python script.py --sort-variables --format-terraform
          python script.py --sort-variables --generate-readme --readme-header-file HEADER.md
    '''),
    formatter_class=argparse.RawDescriptionHelpFormatter,  # preserve newlines in epilog
)
parser.add_argument("--sort-variables", action="store_true")
parser.add_argument("--generate-readme", action="store_true")
parser.add_argument("--readme-header-file", type=str, default="")

Always import textwrap when using multiline epilog or description with dedent() to preserve formatting.

Python
import typer
from typing import Annotated
 
app = typer.Typer(help="Azure resource auditing tool.")
 
@app.command()
def audit(
    subscription_id: Annotated[str, typer.Option("--subscription-id", "-s", help="Azure subscription ID", envvar="AZURE_SUBSCRIPTION_ID")],
    resource_group:  Annotated[str, typer.Option("--resource-group",  "-g", help="Resource group name")],
    output:          Annotated[str, typer.Option(help="Output format")] = "table",
    verbose:         bool = False,
    dry_run:         bool = typer.Option(False, "--dry-run", help="Plan only, no changes"),
) -> None:
    """Audit resources in a resource group."""
    if verbose:
        typer.echo(f"Auditing {resource_group} in {subscription_id}")
    run_audit(subscription_id, resource_group, output=output, dry_run=dry_run)
 
@app.command()
def deploy(
    environment: Annotated[str, typer.Argument(help="Target environment")],
    confirm:     bool = typer.Option(False, "--yes", "-y"),
) -> None:
    """Deploy infrastructure to an environment."""
    if not confirm:
        typer.confirm(f"Deploy to {environment}?", abort=True)
    do_deploy(environment)
 
if __name__ == "__main__":
    app()

DevOps Utilities

Package management

Bash
# Upgrade all outdated packages
pip list --outdated --format=json | python3 -c \
  "import json,sys; [print(p['name']) for p in json.load(sys.stdin)]" \
  | xargs -n1 pip install --upgrade
 
# Freeze exact versions for reproducible installs
pip freeze > requirements.txt
 
# Compile locked dependencies from pyproject.toml
uv pip compile pyproject.toml -o requirements.txt
uv pip compile pyproject.toml --extra dev -o requirements-dev.txt

Sort variables.tf or outputs.tf alphabetically

Python
import re, argparse
from pathlib import Path
 
 
def sort_hcl_blocks(content: str, block_type: str) -> str:
    pattern = re.compile(
        rf'{block_type}\s+"[^"]+"\s+\{{.*?\n}}',
        re.DOTALL,
    )
    blocks = pattern.findall(content)
    blocks.sort(key=lambda b: re.search(rf'{block_type}\s+"([^"]+)"', b).group(1))
    return "\n\n".join(blocks) + "\n"
 
 
def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("file",       help="Path to .tf file")
    parser.add_argument("--type",     default="variable", choices=["variable", "output"])
    parser.add_argument("--in-place", action="store_true")
    args = parser.parse_args()
 
    path    = Path(args.file)
    sorted_ = sort_hcl_blocks(path.read_text(), args.type)
 
    if args.in_place:
        path.write_text(sorted_)
        print(f"Sorted {args.type} blocks in {path}")
    else:
        print(sorted_)
 
 
if __name__ == "__main__":
    main()

Config loader - layered config (file + env overrides)

Python
import os
import yaml
from dataclasses import dataclass, field
from pathlib import Path
 
 
@dataclass
class AppConfig:
    subscription_id: str
    tenant_id:       str
    environment:     str  = "dev"
    location:        str  = "UK South"
    log_level:       str  = "INFO"
    tags:            dict[str, str] = field(default_factory=dict)
 
    @classmethod
    def from_file(cls, path: Path) -> "AppConfig":
        raw = yaml.safe_load(path.read_text())
        # Environment variables override file values
        raw["subscription_id"] = os.environ.get("AZURE_SUBSCRIPTION_ID", raw.get("subscription_id", ""))
        raw["tenant_id"]       = os.environ.get("AZURE_TENANT_ID",       raw.get("tenant_id",       ""))
        raw["environment"]     = os.environ.get("ENVIRONMENT",            raw.get("environment",     "dev"))
        return cls(**{k: v for k, v in raw.items() if k in cls.__dataclass_fields__})
 
    @classmethod
    def from_env(cls) -> "AppConfig":
        return cls(
            subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
            tenant_id=os.environ["AZURE_TENANT_ID"],
            environment=os.environ.get("ENVIRONMENT", "dev"),
            location=os.environ.get("AZURE_LOCATION", "UK South"),
            log_level=os.environ.get("LOG_LEVEL", "INFO"),
        )

Anti-patterns

  • ⚠️ Bare except: - catches KeyboardInterrupt, SystemExit, and BaseException. Always catch a specific type or at minimum except Exception.
  • 🚨 yaml.load() instead of yaml.safe_load() - yaml.load() can execute arbitrary Python via tagged objects. Always use yaml.safe_load() unless you control 100% of the input.
  • 🚨 shell=True in subprocess - expands shell metacharacters and enables injection. Build the command as a list and omit shell=True.
  • ⚠️ print() for diagnostics - bypasses log levels, handlers, and formatters; goes to stdout and mixes with real output. Use logging.getLogger(__name__) in modules.
  • 🔬 f-string in structured log messages - log.info(f"user {uid} logged in") builds the string even when INFO is disabled. Use lazy formatting: log.info("user %s logged in", uid).
  • ⚠️ Mutable default arguments - def process(items=[]) shares the list across all calls. Use None as the default and create the mutable object inside the function.
  • 🔬 Importing * from a module - pollutes the namespace and makes IDE analysis and refactoring unreliable. Always import names explicitly.
Last updated on