Skip to Content
DocumentsPython Standards

Python Standards

An opinionated, production-grade set of standards for writing Python that is consistent, type-safe, observable, and testable. It covers coding style and naming, type hints and static analysis, structured error handling, logging (standard library and logging libraries), OpenTelemetry, shipping telemetry into Azure Monitor, pytest testing, packaging, and CI/CD.

Scope: Python 3.12+, managed with uv, linted with Ruff, type-checked with mypy --strict, and tested with pytest. Examples target service and automation code (CLIs, FastAPI services, Azure-integrated tooling).

Grounding: PEP 8 - Style Guide  · PEP 257 - Docstring Conventions  · PEP 484 - Type Hints  · Logging HOWTO .


Why standards?

Python’s flexibility is a double-edged sword. It will run code with no type hints, swallow exceptions silently, and let print() stand in for logging. Standards make Python production software rather than a script that happens to work:

  • Type hints plus mypy catch whole classes of bugs before runtime and document intent
  • A single error-handling discipline means failures are specific, traceable, and never silent
  • Structured logging and tracing put automation telemetry alongside everything else in the platform
  • A consistent layout and toolchain means any engineer can read, test, and ship any service

Tooling & Versions

ToolPurposeNotes
uvEnvironment + dependency managerReplaces pip, venv, pip-tools, pyenv
RuffLinter + formatterReplaces Flake8, isort, Black, many plugins
mypyStatic type checkerRun in --strict mode
pytestTest runnerWith pytest-cov, pytest-asyncio
pre-commitLocal git hooksRuns Ruff + mypy before commit

Rule: Pin the Python version per repository in .python-version and the toolchain in pyproject.toml. CI and developer machines resolve the same versions from the same files - no “works on my machine”.

Project structure - src layout

The src layout prevents the classic bug where tests import the local package directory instead of the installed package, masking packaging errors.

PLAINTEXT
my-service/
├── pyproject.toml          # Single source of truth: deps, build, tool config
├── .python-version         # e.g. 3.12 - read by uv
├── README.md
├── src/
│   └── my_service/
│       ├── __init__.py
│       ├── config.py        # Typed settings loaded from env
│       ├── telemetry.py     # OpenTelemetry + logging setup, called once at startup
│       ├── errors.py        # Custom exception hierarchy
│       └── clients/
│           └── storage.py
└── tests/
    ├── conftest.py          # Shared fixtures
    ├── test_config.py
    └── test_storage.py

pyproject.toml

TOML
[project]
name = "my-service"
version = "1.4.0"
requires-python = ">=3.12"
dependencies = [
    "httpx>=0.27",
    "pydantic>=2.7",
    "azure-identity>=1.17",
    "azure-monitor-opentelemetry>=1.6",
]
 
[project.optional-dependencies]
dev = [
    "pytest>=8.2",
    "pytest-cov>=5.0",
    "pytest-asyncio>=0.23",
    "pytest-mock>=3.14",
    "ruff>=0.5",
    "mypy>=1.10",
]
 
[tool.ruff]
line-length = 100
target-version = "py312"
 
[tool.ruff.lint]
# A broad, opinionated rule set. E/F=pyflakes+pycodestyle, I=isort, UP=pyupgrade,
# B=bugbear, SIM=simplify, RUF=ruff, ASYNC, S=bandit security, PTH=use pathlib.
select = ["E", "F", "I", "UP", "B", "SIM", "RUF", "ASYNC", "S", "PTH", "PL", "C4"]
ignore = ["PLR0913"]   # "too many arguments" - tune to your codebase
 
[tool.ruff.lint.per-file-ignores]
"tests/**" = ["S101"]   # allow assert in tests (bandit S101)
 
[tool.mypy]
python_version = "3.12"
strict = true
warn_unreachable = true
warn_redundant_casts = true
disallow_any_explicit = false   # tighten over time
 
[tool.pytest.ini_options]
asyncio_mode = "auto"
addopts = "--strict-markers --cov=my_service --cov-report=term-missing"
Bash
uv sync --extra dev        # create .venv and install everything
uv run ruff check .        # lint
uv run ruff format .       # format
uv run mypy src            # type-check
uv run pytest              # test

Coding Style & Naming

Follow PEP 8, enforced by Ruff so style is never a review topic. The conventions that matter most:

ElementConventionExample
Modules / packageslower_snake_casestorage_client.py
Functions / variableslower_snake_casedef get_blob(...), retry_count
Classes / exceptionsPascalCaseclass StorageClient, class ConfigError
ConstantsUPPER_SNAKE_CASEMAX_RETRIES = 3
Type variablesPascalCase, shortT, KeyT
”Internal” namesleading underscore_parse_internal()

Style rules

  • Type-hint everything public. Every function signature has parameter and return annotations. Use modern syntax: list[str], dict[str, int], str | None (not Optional[str]), built-in generics (no from typing import List).
  • Docstrings on every public module, class, and function (PEP 257). One-line summary, then detail. Document what is non-obvious - not what the signature already says.
  • f-strings for interpolation, except in logging calls (see Logging).
  • pathlib.Path, never os.path string juggling. (Ruff PTH.)
  • Absolute imports, grouped and ordered by Ruff’s isort (stdlib, third-party, first-party).
  • Prefer dataclasses or Pydantic models over loose dicts for structured data so the shape is typed and validated.
Python
from dataclasses import dataclass
from pathlib import Path
 
 
@dataclass(frozen=True, slots=True)
class DeployTarget:
    """An environment a deployment can run against."""
 
    name: str
    region: str
    subscription_id: str
 
 
def load_targets(config_dir: Path) -> list[DeployTarget]:
    """Load deploy targets from every ``*.json`` file in ``config_dir``.
 
    Raises:
        ConfigError: if a file is missing required fields.
    """
    ...

Type Hints & Static Analysis

mypy --strict is part of the build, not an optional extra. Strict mode forbids implicit Any, untyped function definitions, and unchecked optionals.

Python
from collections.abc import Callable, Iterable
from typing import Protocol
 
 
# Protocol - structural typing. Anything with these methods satisfies it,
# no explicit inheritance needed. Prefer over ABCs for "duck-typed" contracts.
class Closeable(Protocol):
    def close(self) -> None: ...
 
 
# PEP 695 type-parameter syntax (3.12+) - no module-level TypeVar needed.
def first[T](items: Iterable[T], predicate: Callable[[T], bool]) -> T | None:
    """Return the first item matching ``predicate``, or ``None``."""
    return next((item for item in items if predicate(item)), None)

Rule: Never use bare Any to silence the type checker. Use object and narrow, define a Protocol, or write the real type. Any disables checking for that value and everything it touches downstream. If a third-party library is untyped, add a typed wrapper at the boundary.


Error Handling

A typed exception hierarchy

Define a base exception per package and derive specific ones. Callers catch the base to handle “any error from this library”, or a specific subclass to handle a known case.

Python
# errors.py
class ServiceError(Exception):
    """Base class for all errors raised by this service."""
 
 
class ConfigError(ServiceError):
    """Configuration is missing or invalid."""
 
 
class ResourceNotFoundError(ServiceError):
    def __init__(self, resource_type: str, name: str) -> None:
        self.resource_type = resource_type
        self.name = name
        super().__init__(f"{resource_type} '{name}' not found")

Catch narrowly, re-raise with context

Python
def get_user(user_id: str) -> User:
    try:
        return _db.fetch_user(user_id)
    except KeyError as exc:
        # Translate a low-level error into a domain error, preserving the cause.
        # `raise ... from exc` keeps the original traceback chain.
        raise ResourceNotFoundError("User", user_id) from exc

Rules

  • Never use a bare except: - it catches KeyboardInterrupt and SystemExit. Catch a specific type, or except Exception at the outermost boundary only.
  • Always raise NewError(...) from original when translating exceptions, so the cause chain is preserved.
  • Prefer EAFP over LBYL where it reads clearly - try: value = d[key] rather than if key in d: followed by d[key] (avoids a race and a double lookup).
  • Use context managers (with) for every resource - files, locks, clients, spans. Write @contextmanager helpers rather than try/finally boilerplate.
  • Do not catch what you cannot handle. Let it propagate to a boundary (the request handler, the CLI entry point, the task runner) that logs once with full context.
Python
from contextlib import contextmanager
from collections.abc import Iterator
import time
 
 
@contextmanager
def timed(label: str) -> Iterator[None]:
    start = time.perf_counter()
    try:
        yield
    finally:
        log.info("%s took %.1fms", label, (time.perf_counter() - start) * 1000)
 
 
with timed("deploy"):
    run_deploy()

Aggregating concurrent errors - ExceptionGroup (3.11+)

Python
import asyncio
 
try:
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(fetch(url))
except* TimeoutError as eg:
    log.warning("%d requests timed out", len(eg.exceptions))
except* httpx.HTTPStatusError as eg:
    for exc in eg.exceptions:
        log.error("HTTP error: %s", exc)

Logging

Use the standard library logging module. Configure it once, at the application entry point - never in library modules. Library code only ever calls logging.getLogger(__name__) and lets the application decide handlers and levels.

Configuration at the entry point

Python
import logging
import sys
 
 
def configure_logging(level: str = "INFO") -> None:
    logging.basicConfig(
        level=getattr(logging, level.upper()),
        format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%SZ",
        stream=sys.stdout,
    )
    # Quieten noisy third-party loggers
    logging.getLogger("azure").setLevel(logging.WARNING)
    logging.getLogger("httpx").setLevel(logging.WARNING)
 
 
log = logging.getLogger(__name__)

Rules

  • Never use print() for diagnostics. It bypasses levels, handlers, and formatters, and writes to stdout mixing with real output. Use a logger.
  • Use lazy %s formatting, not f-strings, in log calls. log.info("user %s did %s", uid, action) builds the string only if the level is enabled. log.info(f"user {uid} did {action}") always builds it, even when suppressed.
  • Use log.exception(...) inside except blocks - it logs at ERROR with the full traceback automatically. log.error(str(exc)) throws away the stack.
  • Never log secrets. Mask tokens, passwords, and connection strings at the call site.
  • Log levels mean something: DEBUG local diagnostics; INFO business events; WARNING recoverable/retried; ERROR an operation failed; CRITICAL the process is unhealthy.
Python
try:
    deploy(target)
except ServiceError:
    log.exception("Deploy failed for target %s", target.name)   # ERROR + traceback
    raise

Structured JSON logging

In containers, Azure Functions, or any environment with a log shipper, emit one JSON object per line. python-json-logger is the no-fuss option.

Python
import logging
import sys
from pythonjsonlogger.json import JsonFormatter
 
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
    JsonFormatter(
        "%(asctime)s %(levelname)s %(name)s %(message)s",
        rename_fields={"asctime": "timestamp", "levelname": "level"},
        timestamp=True,
    )
)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers = [handler]
 
# `extra` promotes keys into the JSON record - prefer this to interpolating values
log.info("deploy started", extra={"env": "prd", "resource_group": "rg-app"})

Correlation IDs with contextvars

To attach a correlation_id (and trace IDs) to every log line in a request or task scope, use contextvars plus a logging filter. contextvars flows correctly across async boundaries, unlike threading.local.

Python
import contextvars
import logging
 
_correlation_id: contextvars.ContextVar[str] = contextvars.ContextVar("correlation_id", default="-")
 
 
class ContextFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        record.correlation_id = _correlation_id.get()
        return True
 
 
logging.getLogger().addFilter(ContextFilter())
# Include %(correlation_id)s in the format string, or it is added to JSON records automatically.

Logging library - structlog

For richer structured logging without hand-rolling formatters and context plumbing, adopt structlog. It binds context once and renders JSON in production, pretty output in dev, and integrates with the standard library and OpenTelemetry.

Python
import structlog
 
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,        # auto-include bound contextvars
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
        structlog.processors.dict_tracebacks,
        structlog.processors.JSONRenderer(),
    ],
)
 
log = structlog.get_logger()
 
# Bind context once, reuse everywhere - every line carries job_id + tenant
job_log = log.bind(job_id="abc123", tenant="acme")
job_log.info("deploy_started", env="prd")
job_log.warning("retrying", attempt=2)

OpenTelemetry

OpenTelemetry is the standard for traces, metrics, and logs. Python has mature support: an API/SDK, broad auto-instrumentation, and an OTLP exporter.

Zero-code auto-instrumentation

The fastest path - no code changes. Auto-instrumentation wraps common libraries (requests, httpx, FastAPI, psycopg, etc.).

Bash
uv run opentelemetry-bootstrap -a install     # install instrumentation for detected libs
OTEL_SERVICE_NAME=my-service \
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 \
OTEL_RESOURCE_ATTRIBUTES=deployment.environment=prd,service.namespace=platform \
  uv run opentelemetry-instrument python -m my_service

Manual instrumentation - traces, metrics, logs

Configure providers once at startup (telemetry.py). Use BatchSpanProcessor so spans export in the background and flush on shutdown.

Python
# telemetry.py
import logging
 
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
 
 
def configure_telemetry() -> None:
    resource = Resource.create({"service.name": "my-service", "service.version": "1.4.0"})
 
    # Traces
    tracer_provider = TracerProvider(resource=resource)
    tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
    trace.set_tracer_provider(tracer_provider)
 
    # Metrics
    reader = PeriodicExportingMetricReader(OTLPMetricExporter())
    metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
 
 
tracer = trace.get_tracer("my_service")
meter = metrics.get_meter("my_service")
deploy_counter = meter.create_counter("deploys", unit="1", description="Completed deploys")
Python
# Use spans as context managers; record exceptions and set status on failure.
from opentelemetry.trace import Status, StatusCode
 
 
def deploy(target: DeployTarget) -> None:
    with tracer.start_as_current_span("deploy") as span:
        span.set_attribute("deploy.environment", target.name)
        try:
            _run(target)
            deploy_counter.add(1, {"environment": target.name, "result": "success"})
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(Status(StatusCode.ERROR, str(exc)))
            deploy_counter.add(1, {"environment": target.name, "result": "failure"})
            raise

Rule: Prefer standard OTEL_* environment variables (OTEL_SERVICE_NAME, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_RESOURCE_ATTRIBUTES) over hardcoding endpoints. The same build then exports to any collector or backend by configuration alone. Always shut providers down cleanly (or rely on the SDK’s atexit flush) so the final batch is not lost.


Azure Monitor Telemetry Sync

For Azure-hosted services, the azure-monitor-opentelemetry distro is the supported, batteries-included path. One call wires OpenTelemetry traces, metrics, and logs to Application Insights and turns on auto-instrumentation. (The older opencensus-ext-azure library is deprecated - do not use it for new code.)

Python
from azure.monitor.opentelemetry import configure_azure_monitor
from azure.identity import DefaultAzureCredential
 
# Reads APPLICATIONINSIGHTS_CONNECTION_STRING from the environment (it identifies the
# resource + ingestion endpoint). Passing a credential makes ingestion authenticate via
# Entra ID rather than trusting the connection string's key - no secret in code.
configure_azure_monitor(credential=DefaultAzureCredential())
 
# After this call, the standard OpenTelemetry API and the stdlib logging module
# both flow to Application Insights. Use them exactly as above:
import logging
from opentelemetry import trace
 
log = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
 
with tracer.start_as_current_span("startup"):
    log.info("service started", extra={"env": "prd"})   # appears in App Insights traces table

Rule: Set APPLICATIONINSIGHTS_CONNECTION_STRING from configuration (it identifies the resource and ingestion endpoint) and pass a DefaultAzureCredential so ingestion is authenticated by Entra ID - managed identity on Azure, az login / workload identity locally and in CI - rather than the connection string’s key being the trust boundary. Grant the identity the Monitoring Metrics Publisher role.

Custom logs via the Logs Ingestion API

For arbitrary structured records into a Log Analytics custom table (not application telemetry), use the azure-monitor-ingestion client with a Data Collection Endpoint (DCE) and Data Collection Rule (DCR). This is the supported replacement for the deprecated HTTP Data Collector API.

Python
from azure.identity import DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
from datetime import datetime, timezone
 
client = LogsIngestionClient(
    endpoint="https://dce-ldo-uks-prd.uksouth-1.ingest.monitor.azure.com",
    credential=DefaultAzureCredential(),
)
 
client.upload(
    rule_id="dcr-xxxxxxxxxxxxxxxx",          # DCR immutable ID
    stream_name="Custom-DeployLog_CL",
    logs=[
        {
            "TimeGenerated": datetime.now(timezone.utc).isoformat(),
            "Level": "Information",
            "Message": "Deploy completed",
            "Environment": "prd",
        }
    ],
)

Testing with pytest

Structure and fixtures

Python
# conftest.py - shared fixtures
import pytest
from my_service.config import Config
 
 
@pytest.fixture
def config() -> Config:
    return Config(subscription_id="00000000-0000-0000-0000-000000000000", environment="test")
Python
# test_storage.py
import pytest
from my_service.errors import ResourceNotFoundError
from my_service.clients.storage import StorageClient
 
 
class TestStorageClient:
    def test_get_blob_returns_content(self, mocker) -> None:
        mock_client = mocker.patch("my_service.clients.storage.BlobServiceClient")
        mock_client.return_value.get_blob_client.return_value.download_blob.return_value.readall.return_value = b"data"
 
        result = StorageClient("acct").get_blob("container", "name")
 
        assert result == b"data"
 
    def test_missing_blob_raises(self, mocker) -> None:
        mocker.patch(
            "my_service.clients.storage.BlobServiceClient",
            side_effect=KeyError("nope"),
        )
        with pytest.raises(ResourceNotFoundError):
            StorageClient("acct").get_blob("container", "missing")

Data-driven tests with parametrize

Python
import pytest
from my_service.regions import to_azure_region
 
 
@pytest.mark.parametrize(
    ("code", "expected"),
    [
        ("uks", "uksouth"),
        ("ukw", "ukwest"),
        ("euw", "westeurope"),
    ],
)
def test_region_lookup(code: str, expected: str) -> None:
    assert to_azure_region(code) == expected

Async tests and environment patching

Python
import pytest
 
 
@pytest.mark.asyncio
async def test_fetch_all(mocker) -> None:
    mock_get = mocker.patch("httpx.AsyncClient.get")
    mock_get.return_value.json.return_value = {"id": "1"}
 
    from my_service.http import fetch_all
    results = await fetch_all(["https://api.example.com/a"])
    assert len(results) == 1
 
 
def test_config_from_env(monkeypatch) -> None:
    monkeypatch.setenv("AZURE_SUBSCRIPTION_ID", "test-sub")
    from my_service.config import Config
    assert Config().subscription_id == "test-sub"

Testing strategy

Test typeToolScopeWhen
Lint / formatRuffEvery fileEvery commit
Type checkmypy --strictEvery fileEvery commit
Unitpytest + mocksOne unit, no I/OEvery commit
Integrationpytest (no mocks)Real dependenciesPR merge, nightly
Coveragepytest-covWhole packageEvery commit (fail under threshold)

Rule: Unit tests mock all I/O - no real network, filesystem, or cloud calls. Patch at the boundary (the client class or function the code calls), not deep inside a third-party library. Enforce a coverage floor in CI (--cov-fail-under=85) so coverage cannot silently erode.


Packaging & Distribution

TOML
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
 
[project.scripts]
my-service = "my_service.cli:main"    # console entry point
Bash
uv build                              # build sdist + wheel into dist/
uv publish                            # publish to a registry (or Azure Artifacts feed)
ChangeVersion bumpExample
Bug fix, internal refactorPatch1.4.0 → 1.4.1
New backward-compatible featureMinor1.4.0 → 1.5.0
Removed/changed public API, dropped Python versionMajor1.4.0 → 2.0.0

Rule: Pin direct dependencies with a lower bound and a compatible upper bound (>=0.27,<1.0), and commit a lockfile (uv.lock) so builds are reproducible. Applications pin tightly; libraries keep ranges wide enough to compose.


CI/CD

Standard stage order

PLAINTEXT
lint (ruff) → type-check (mypy) → test (pytest + coverage) → build → [approval] → publish

GitHub Actions reference

YAML
name: Python
 
on:
  push: { branches: [main] }
  pull_request:
 
jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v3
        with:
          enable-cache: true
 
      - name: Install
        run: uv sync --extra dev
 
      - name: Lint
        run: uv run ruff check --output-format=github .
 
      - name: Format check
        run: uv run ruff format --check .
 
      - name: Type check
        run: uv run mypy src
 
      - name: Test
        run: uv run pytest --cov-fail-under=85

Rule: Run ruff format --check (not ruff format) in CI - the formatter fixes locally and via pre-commit, never silently in the pipeline. A formatting drift fails the build and is fixed in a commit, keeping CI deterministic.


Anti-patterns

  • 🚨 Bare except: - catches KeyboardInterrupt and SystemExit, hiding shutdown signals and unrelated failures. Catch a specific exception type, or except Exception only at an outermost boundary.
  • 🚨 print() for diagnostics - bypasses levels, handlers, formatters, and goes to stdout, mixing with real output. Use a logger from logging.getLogger(__name__).
  • 🚨 yaml.load() / pickle.load() / eval() on untrusted input - arbitrary code execution. Use yaml.safe_load(), a vetted serialisation format, and never eval/exec on external data.
  • 🚨 subprocess with shell=True and interpolated input - shell injection. Pass an argument list and omit shell=True.
  • ⚠️ f-strings in log calls - log.info(f"user {uid}") builds the string even when the level is disabled and loses structured fields. Use lazy %s formatting and extra={}.
  • ⚠️ Bare Any to silence mypy - disables type checking for that value and everything downstream. Use object and narrow, a Protocol, or the real type.
  • ⚠️ Mutable default arguments - def f(items=[]) shares one list across all calls. Default to None and create the object inside the function.
  • ⚠️ Configuring logging inside library modules - libraries call getLogger(__name__) and nothing else; the application owns handlers and levels. A library that calls basicConfig() hijacks the host’s logging.
  • ⚠️ Catching an exception only to log.error(str(e)) and continue - loses the traceback and swallows a failure that should propagate. Use log.exception(...) and re-raise, or do not catch it.
  • 🔬 Logging secrets - tokens, connection strings, and PII must be masked at the call site, even via structured fields. The telemetry backend is not a secret store.
  • 🔬 opencensus-ext-azure for new telemetry - deprecated. Use the azure-monitor-opentelemetry distro.
  • 🔬 Flushing telemetry only at process exit in long-running jobs - a crash loses the buffer. Use BatchSpanProcessor/PeriodicExportingMetricReader (background export) and ensure clean shutdown so the final batch flushes.

See Also

Last updated on