Python Standards
An opinionated, production-grade set of standards for writing Python that is consistent, type-safe, observable, and testable. It covers coding style and naming, type hints and static analysis, structured error handling, logging (standard library and logging libraries), OpenTelemetry, shipping telemetry into Azure Monitor, pytest testing, packaging, and CI/CD.
Scope: Python 3.12+, managed with
uv, linted withRuff, type-checked withmypy --strict, and tested withpytest. Examples target service and automation code (CLIs, FastAPI services, Azure-integrated tooling).Grounding: PEP 8 - Style Guide · PEP 257 - Docstring Conventions · PEP 484 - Type Hints · Logging HOWTO .
Why standards?
Python’s flexibility is a double-edged sword. It will run code with no type hints, swallow exceptions silently, and let print() stand in for logging. Standards make Python production software rather than a script that happens to work:
- Type hints plus
mypycatch whole classes of bugs before runtime and document intent - A single error-handling discipline means failures are specific, traceable, and never silent
- Structured logging and tracing put automation telemetry alongside everything else in the platform
- A consistent layout and toolchain means any engineer can read, test, and ship any service
Tooling & Versions
| Tool | Purpose | Notes |
|---|---|---|
uv | Environment + dependency manager | Replaces pip, venv, pip-tools, pyenv |
Ruff | Linter + formatter | Replaces Flake8, isort, Black, many plugins |
mypy | Static type checker | Run in --strict mode |
pytest | Test runner | With pytest-cov, pytest-asyncio |
pre-commit | Local git hooks | Runs Ruff + mypy before commit |
Rule: Pin the Python version per repository in
.python-versionand the toolchain inpyproject.toml. CI and developer machines resolve the same versions from the same files - no “works on my machine”.
Project structure - src layout
The src layout prevents the classic bug where tests import the local package directory instead of the installed package, masking packaging errors.
my-service/
├── pyproject.toml # Single source of truth: deps, build, tool config
├── .python-version # e.g. 3.12 - read by uv
├── README.md
├── src/
│ └── my_service/
│ ├── __init__.py
│ ├── config.py # Typed settings loaded from env
│ ├── telemetry.py # OpenTelemetry + logging setup, called once at startup
│ ├── errors.py # Custom exception hierarchy
│ └── clients/
│ └── storage.py
└── tests/
├── conftest.py # Shared fixtures
├── test_config.py
└── test_storage.pypyproject.toml
[project]
name = "my-service"
version = "1.4.0"
requires-python = ">=3.12"
dependencies = [
"httpx>=0.27",
"pydantic>=2.7",
"azure-identity>=1.17",
"azure-monitor-opentelemetry>=1.6",
]
[project.optional-dependencies]
dev = [
"pytest>=8.2",
"pytest-cov>=5.0",
"pytest-asyncio>=0.23",
"pytest-mock>=3.14",
"ruff>=0.5",
"mypy>=1.10",
]
[tool.ruff]
line-length = 100
target-version = "py312"
[tool.ruff.lint]
# A broad, opinionated rule set. E/F=pyflakes+pycodestyle, I=isort, UP=pyupgrade,
# B=bugbear, SIM=simplify, RUF=ruff, ASYNC, S=bandit security, PTH=use pathlib.
select = ["E", "F", "I", "UP", "B", "SIM", "RUF", "ASYNC", "S", "PTH", "PL", "C4"]
ignore = ["PLR0913"] # "too many arguments" - tune to your codebase
[tool.ruff.lint.per-file-ignores]
"tests/**" = ["S101"] # allow assert in tests (bandit S101)
[tool.mypy]
python_version = "3.12"
strict = true
warn_unreachable = true
warn_redundant_casts = true
disallow_any_explicit = false # tighten over time
[tool.pytest.ini_options]
asyncio_mode = "auto"
addopts = "--strict-markers --cov=my_service --cov-report=term-missing"uv sync --extra dev # create .venv and install everything
uv run ruff check . # lint
uv run ruff format . # format
uv run mypy src # type-check
uv run pytest # testCoding Style & Naming
Follow PEP 8, enforced by Ruff so style is never a review topic. The conventions that matter most:
| Element | Convention | Example |
|---|---|---|
| Modules / packages | lower_snake_case | storage_client.py |
| Functions / variables | lower_snake_case | def get_blob(...), retry_count |
| Classes / exceptions | PascalCase | class StorageClient, class ConfigError |
| Constants | UPPER_SNAKE_CASE | MAX_RETRIES = 3 |
| Type variables | PascalCase, short | T, KeyT |
| ”Internal” names | leading underscore | _parse_internal() |
Style rules
- Type-hint everything public. Every function signature has parameter and return annotations. Use modern syntax:
list[str],dict[str, int],str | None(notOptional[str]), built-in generics (nofrom typing import List). - Docstrings on every public module, class, and function (PEP 257). One-line summary, then detail. Document what is non-obvious - not what the signature already says.
- f-strings for interpolation, except in logging calls (see Logging).
pathlib.Path, neveros.pathstring juggling. (RuffPTH.)- Absolute imports, grouped and ordered by Ruff’s isort (stdlib, third-party, first-party).
- Prefer
dataclassesor Pydantic models over loose dicts for structured data so the shape is typed and validated.
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True, slots=True)
class DeployTarget:
"""An environment a deployment can run against."""
name: str
region: str
subscription_id: str
def load_targets(config_dir: Path) -> list[DeployTarget]:
"""Load deploy targets from every ``*.json`` file in ``config_dir``.
Raises:
ConfigError: if a file is missing required fields.
"""
...Type Hints & Static Analysis
mypy --strict is part of the build, not an optional extra. Strict mode forbids implicit Any, untyped function definitions, and unchecked optionals.
from collections.abc import Callable, Iterable
from typing import Protocol
# Protocol - structural typing. Anything with these methods satisfies it,
# no explicit inheritance needed. Prefer over ABCs for "duck-typed" contracts.
class Closeable(Protocol):
def close(self) -> None: ...
# PEP 695 type-parameter syntax (3.12+) - no module-level TypeVar needed.
def first[T](items: Iterable[T], predicate: Callable[[T], bool]) -> T | None:
"""Return the first item matching ``predicate``, or ``None``."""
return next((item for item in items if predicate(item)), None)Rule: Never use bare
Anyto silence the type checker. Useobjectand narrow, define aProtocol, or write the real type.Anydisables checking for that value and everything it touches downstream. If a third-party library is untyped, add a typed wrapper at the boundary.
Error Handling
A typed exception hierarchy
Define a base exception per package and derive specific ones. Callers catch the base to handle “any error from this library”, or a specific subclass to handle a known case.
# errors.py
class ServiceError(Exception):
"""Base class for all errors raised by this service."""
class ConfigError(ServiceError):
"""Configuration is missing or invalid."""
class ResourceNotFoundError(ServiceError):
def __init__(self, resource_type: str, name: str) -> None:
self.resource_type = resource_type
self.name = name
super().__init__(f"{resource_type} '{name}' not found")Catch narrowly, re-raise with context
def get_user(user_id: str) -> User:
try:
return _db.fetch_user(user_id)
except KeyError as exc:
# Translate a low-level error into a domain error, preserving the cause.
# `raise ... from exc` keeps the original traceback chain.
raise ResourceNotFoundError("User", user_id) from excRules
- Never use a bare
except:- it catchesKeyboardInterruptandSystemExit. Catch a specific type, orexcept Exceptionat the outermost boundary only. - Always
raise NewError(...) from originalwhen translating exceptions, so the cause chain is preserved. - Prefer EAFP over LBYL where it reads clearly -
try: value = d[key]rather thanif key in d:followed byd[key](avoids a race and a double lookup). - Use context managers (
with) for every resource - files, locks, clients, spans. Write@contextmanagerhelpers rather thantry/finallyboilerplate. - Do not catch what you cannot handle. Let it propagate to a boundary (the request handler, the CLI entry point, the task runner) that logs once with full context.
from contextlib import contextmanager
from collections.abc import Iterator
import time
@contextmanager
def timed(label: str) -> Iterator[None]:
start = time.perf_counter()
try:
yield
finally:
log.info("%s took %.1fms", label, (time.perf_counter() - start) * 1000)
with timed("deploy"):
run_deploy()Aggregating concurrent errors - ExceptionGroup (3.11+)
import asyncio
try:
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(fetch(url))
except* TimeoutError as eg:
log.warning("%d requests timed out", len(eg.exceptions))
except* httpx.HTTPStatusError as eg:
for exc in eg.exceptions:
log.error("HTTP error: %s", exc)Logging
Use the standard library logging module. Configure it once, at the application entry point - never in library modules. Library code only ever calls logging.getLogger(__name__) and lets the application decide handlers and levels.
Configuration at the entry point
import logging
import sys
def configure_logging(level: str = "INFO") -> None:
logging.basicConfig(
level=getattr(logging, level.upper()),
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%SZ",
stream=sys.stdout,
)
# Quieten noisy third-party loggers
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
log = logging.getLogger(__name__)Rules
- Never use
print()for diagnostics. It bypasses levels, handlers, and formatters, and writes to stdout mixing with real output. Use a logger. - Use lazy
%sformatting, not f-strings, in log calls.log.info("user %s did %s", uid, action)builds the string only if the level is enabled.log.info(f"user {uid} did {action}")always builds it, even when suppressed. - Use
log.exception(...)insideexceptblocks - it logs at ERROR with the full traceback automatically.log.error(str(exc))throws away the stack. - Never log secrets. Mask tokens, passwords, and connection strings at the call site.
- Log levels mean something:
DEBUGlocal diagnostics;INFObusiness events;WARNINGrecoverable/retried;ERRORan operation failed;CRITICALthe process is unhealthy.
try:
deploy(target)
except ServiceError:
log.exception("Deploy failed for target %s", target.name) # ERROR + traceback
raiseStructured JSON logging
In containers, Azure Functions, or any environment with a log shipper, emit one JSON object per line. python-json-logger is the no-fuss option.
import logging
import sys
from pythonjsonlogger.json import JsonFormatter
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
JsonFormatter(
"%(asctime)s %(levelname)s %(name)s %(message)s",
rename_fields={"asctime": "timestamp", "levelname": "level"},
timestamp=True,
)
)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers = [handler]
# `extra` promotes keys into the JSON record - prefer this to interpolating values
log.info("deploy started", extra={"env": "prd", "resource_group": "rg-app"})Correlation IDs with contextvars
To attach a correlation_id (and trace IDs) to every log line in a request or task scope, use contextvars plus a logging filter. contextvars flows correctly across async boundaries, unlike threading.local.
import contextvars
import logging
_correlation_id: contextvars.ContextVar[str] = contextvars.ContextVar("correlation_id", default="-")
class ContextFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
record.correlation_id = _correlation_id.get()
return True
logging.getLogger().addFilter(ContextFilter())
# Include %(correlation_id)s in the format string, or it is added to JSON records automatically.Logging library - structlog
For richer structured logging without hand-rolling formatters and context plumbing, adopt structlog. It binds context once and renders JSON in production, pretty output in dev, and integrates with the standard library and OpenTelemetry.
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # auto-include bound contextvars
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
],
)
log = structlog.get_logger()
# Bind context once, reuse everywhere - every line carries job_id + tenant
job_log = log.bind(job_id="abc123", tenant="acme")
job_log.info("deploy_started", env="prd")
job_log.warning("retrying", attempt=2)OpenTelemetry
OpenTelemetry is the standard for traces, metrics, and logs. Python has mature support: an API/SDK, broad auto-instrumentation, and an OTLP exporter.
Zero-code auto-instrumentation
The fastest path - no code changes. Auto-instrumentation wraps common libraries (requests, httpx, FastAPI, psycopg, etc.).
uv run opentelemetry-bootstrap -a install # install instrumentation for detected libs
OTEL_SERVICE_NAME=my-service \
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 \
OTEL_RESOURCE_ATTRIBUTES=deployment.environment=prd,service.namespace=platform \
uv run opentelemetry-instrument python -m my_serviceManual instrumentation - traces, metrics, logs
Configure providers once at startup (telemetry.py). Use BatchSpanProcessor so spans export in the background and flush on shutdown.
# telemetry.py
import logging
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def configure_telemetry() -> None:
resource = Resource.create({"service.name": "my-service", "service.version": "1.4.0"})
# Traces
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
# Metrics
reader = PeriodicExportingMetricReader(OTLPMetricExporter())
metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
tracer = trace.get_tracer("my_service")
meter = metrics.get_meter("my_service")
deploy_counter = meter.create_counter("deploys", unit="1", description="Completed deploys")# Use spans as context managers; record exceptions and set status on failure.
from opentelemetry.trace import Status, StatusCode
def deploy(target: DeployTarget) -> None:
with tracer.start_as_current_span("deploy") as span:
span.set_attribute("deploy.environment", target.name)
try:
_run(target)
deploy_counter.add(1, {"environment": target.name, "result": "success"})
except Exception as exc:
span.record_exception(exc)
span.set_status(Status(StatusCode.ERROR, str(exc)))
deploy_counter.add(1, {"environment": target.name, "result": "failure"})
raiseRule: Prefer standard
OTEL_*environment variables (OTEL_SERVICE_NAME,OTEL_EXPORTER_OTLP_ENDPOINT,OTEL_RESOURCE_ATTRIBUTES) over hardcoding endpoints. The same build then exports to any collector or backend by configuration alone. Always shut providers down cleanly (or rely on the SDK’s atexit flush) so the final batch is not lost.
Azure Monitor Telemetry Sync
For Azure-hosted services, the azure-monitor-opentelemetry distro is the supported, batteries-included path. One call wires OpenTelemetry traces, metrics, and logs to Application Insights and turns on auto-instrumentation. (The older opencensus-ext-azure library is deprecated - do not use it for new code.)
from azure.monitor.opentelemetry import configure_azure_monitor
from azure.identity import DefaultAzureCredential
# Reads APPLICATIONINSIGHTS_CONNECTION_STRING from the environment (it identifies the
# resource + ingestion endpoint). Passing a credential makes ingestion authenticate via
# Entra ID rather than trusting the connection string's key - no secret in code.
configure_azure_monitor(credential=DefaultAzureCredential())
# After this call, the standard OpenTelemetry API and the stdlib logging module
# both flow to Application Insights. Use them exactly as above:
import logging
from opentelemetry import trace
log = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("startup"):
log.info("service started", extra={"env": "prd"}) # appears in App Insights traces tableRule: Set
APPLICATIONINSIGHTS_CONNECTION_STRINGfrom configuration (it identifies the resource and ingestion endpoint) and pass aDefaultAzureCredentialso ingestion is authenticated by Entra ID - managed identity on Azure,az login/ workload identity locally and in CI - rather than the connection string’s key being the trust boundary. Grant the identity the Monitoring Metrics Publisher role.
Custom logs via the Logs Ingestion API
For arbitrary structured records into a Log Analytics custom table (not application telemetry), use the azure-monitor-ingestion client with a Data Collection Endpoint (DCE) and Data Collection Rule (DCR). This is the supported replacement for the deprecated HTTP Data Collector API.
from azure.identity import DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
from datetime import datetime, timezone
client = LogsIngestionClient(
endpoint="https://dce-ldo-uks-prd.uksouth-1.ingest.monitor.azure.com",
credential=DefaultAzureCredential(),
)
client.upload(
rule_id="dcr-xxxxxxxxxxxxxxxx", # DCR immutable ID
stream_name="Custom-DeployLog_CL",
logs=[
{
"TimeGenerated": datetime.now(timezone.utc).isoformat(),
"Level": "Information",
"Message": "Deploy completed",
"Environment": "prd",
}
],
)Testing with pytest
Structure and fixtures
# conftest.py - shared fixtures
import pytest
from my_service.config import Config
@pytest.fixture
def config() -> Config:
return Config(subscription_id="00000000-0000-0000-0000-000000000000", environment="test")# test_storage.py
import pytest
from my_service.errors import ResourceNotFoundError
from my_service.clients.storage import StorageClient
class TestStorageClient:
def test_get_blob_returns_content(self, mocker) -> None:
mock_client = mocker.patch("my_service.clients.storage.BlobServiceClient")
mock_client.return_value.get_blob_client.return_value.download_blob.return_value.readall.return_value = b"data"
result = StorageClient("acct").get_blob("container", "name")
assert result == b"data"
def test_missing_blob_raises(self, mocker) -> None:
mocker.patch(
"my_service.clients.storage.BlobServiceClient",
side_effect=KeyError("nope"),
)
with pytest.raises(ResourceNotFoundError):
StorageClient("acct").get_blob("container", "missing")Data-driven tests with parametrize
import pytest
from my_service.regions import to_azure_region
@pytest.mark.parametrize(
("code", "expected"),
[
("uks", "uksouth"),
("ukw", "ukwest"),
("euw", "westeurope"),
],
)
def test_region_lookup(code: str, expected: str) -> None:
assert to_azure_region(code) == expectedAsync tests and environment patching
import pytest
@pytest.mark.asyncio
async def test_fetch_all(mocker) -> None:
mock_get = mocker.patch("httpx.AsyncClient.get")
mock_get.return_value.json.return_value = {"id": "1"}
from my_service.http import fetch_all
results = await fetch_all(["https://api.example.com/a"])
assert len(results) == 1
def test_config_from_env(monkeypatch) -> None:
monkeypatch.setenv("AZURE_SUBSCRIPTION_ID", "test-sub")
from my_service.config import Config
assert Config().subscription_id == "test-sub"Testing strategy
| Test type | Tool | Scope | When |
|---|---|---|---|
| Lint / format | Ruff | Every file | Every commit |
| Type check | mypy --strict | Every file | Every commit |
| Unit | pytest + mocks | One unit, no I/O | Every commit |
| Integration | pytest (no mocks) | Real dependencies | PR merge, nightly |
| Coverage | pytest-cov | Whole package | Every commit (fail under threshold) |
Rule: Unit tests mock all I/O - no real network, filesystem, or cloud calls. Patch at the boundary (the client class or function the code calls), not deep inside a third-party library. Enforce a coverage floor in CI (
--cov-fail-under=85) so coverage cannot silently erode.
Packaging & Distribution
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project.scripts]
my-service = "my_service.cli:main" # console entry pointuv build # build sdist + wheel into dist/
uv publish # publish to a registry (or Azure Artifacts feed)| Change | Version bump | Example |
|---|---|---|
| Bug fix, internal refactor | Patch | 1.4.0 → 1.4.1 |
| New backward-compatible feature | Minor | 1.4.0 → 1.5.0 |
| Removed/changed public API, dropped Python version | Major | 1.4.0 → 2.0.0 |
Rule: Pin direct dependencies with a lower bound and a compatible upper bound (
>=0.27,<1.0), and commit a lockfile (uv.lock) so builds are reproducible. Applications pin tightly; libraries keep ranges wide enough to compose.
CI/CD
Standard stage order
lint (ruff) → type-check (mypy) → test (pytest + coverage) → build → [approval] → publishGitHub Actions reference
name: Python
on:
push: { branches: [main] }
pull_request:
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Install
run: uv sync --extra dev
- name: Lint
run: uv run ruff check --output-format=github .
- name: Format check
run: uv run ruff format --check .
- name: Type check
run: uv run mypy src
- name: Test
run: uv run pytest --cov-fail-under=85Rule: Run
ruff format --check(notruff format) in CI - the formatter fixes locally and via pre-commit, never silently in the pipeline. A formatting drift fails the build and is fixed in a commit, keeping CI deterministic.
Anti-patterns
- 🚨 Bare
except:- catchesKeyboardInterruptandSystemExit, hiding shutdown signals and unrelated failures. Catch a specific exception type, orexcept Exceptiononly at an outermost boundary. - 🚨
print()for diagnostics - bypasses levels, handlers, formatters, and goes to stdout, mixing with real output. Use a logger fromlogging.getLogger(__name__). - 🚨
yaml.load()/pickle.load()/eval()on untrusted input - arbitrary code execution. Useyaml.safe_load(), a vetted serialisation format, and nevereval/execon external data. - 🚨
subprocesswithshell=Trueand interpolated input - shell injection. Pass an argument list and omitshell=True. - ⚠️ f-strings in log calls -
log.info(f"user {uid}")builds the string even when the level is disabled and loses structured fields. Use lazy%sformatting andextra={}. - ⚠️ Bare
Anyto silence mypy - disables type checking for that value and everything downstream. Useobjectand narrow, aProtocol, or the real type. - ⚠️ Mutable default arguments -
def f(items=[])shares one list across all calls. Default toNoneand create the object inside the function. - ⚠️ Configuring logging inside library modules - libraries call
getLogger(__name__)and nothing else; the application owns handlers and levels. A library that callsbasicConfig()hijacks the host’s logging. - ⚠️ Catching an exception only to
log.error(str(e))and continue - loses the traceback and swallows a failure that should propagate. Uselog.exception(...)and re-raise, or do not catch it. - 🔬 Logging secrets - tokens, connection strings, and PII must be masked at the call site, even via structured fields. The telemetry backend is not a secret store.
- 🔬
opencensus-ext-azurefor new telemetry - deprecated. Use theazure-monitor-opentelemetrydistro. - 🔬 Flushing telemetry only at process exit in long-running jobs - a crash loses the buffer. Use
BatchSpanProcessor/PeriodicExportingMetricReader(background export) and ensure clean shutdown so the final batch flushes.
See Also
- PEP 8 - Style Guide for Python Code
- PEP 257 - Docstring Conventions
- PEP 484 - Type Hints
- Python Logging HOWTO
- Ruff - linter and formatter
- mypy - static type checker
- uv - Python package and project manager
- pytest documentation
- structlog
- OpenTelemetry Python
- Azure Monitor OpenTelemetry distro for Python
- Azure Monitor Logs Ingestion API
- Python Cheatsheet - quick-reference patterns