Python Cheat Sheet

Activate a venv with handy functions

#!/usr/bin/env bash

# Define the mkvenv function
function mkvenv() {
  local curdir=$(basename $(pwd))
  mkdir -p ~/.virtualenvs
  python3 -m venv ~/.virtualenvs/${curdir}
  echo "Virtual environment for ${curdir} created"
}

# Define the avenv function
function avenv() {
  local curdir=$(basename $(pwd))
  if [ -d ~/.virtualenvs/${curdir} ]; then
    source ~/.virtualenvs/${curdir}/bin/activate
    echo "Virtual environment for ${curdir} activated"
  else
    echo "Error: No virtual environment found for ${curdir}"
  fi
}

# Append the functions to your .bashrc file
echo "Appending functions to .bashrc"
echo "" >> ~/.bashrc
echo "# Define mkvenv function" >> ~/.bashrc
declare -f mkvenv >> ~/.bashrc
echo "" >> ~/.bashrc
echo "# Define avenv function" >> ~/.bashrc
declare -f avenv >> ~/.bashrc
echo "" >> ~/.bashrc

Authenticate to Azure using environment variables

import os
from azure.identity import ClientSecretCredential

def azure_authenticate():
    client_id = os.environ["ARM_CLIENT_ID"]
    client_secret = os.environ["ARM_CLIENT_SECRET"]
    tenant_id = os.environ["ARM_TENANT_ID"]
    credentials = ClientSecretCredential(
        client_id=client_id, client_secret=client_secret, tenant_id=tenant_id
    )
    return credentials

Authenticate to Azure, setup a class, instantiate the class and get a resource id

import os
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient


def azure_authenticate():
    client_id = os.environ["ARM_CLIENT_ID"]
    client_secret = os.environ["ARM_CLIENT_SECRET"]
    tenant_id = os.environ["ARM_TENANT_ID"]
    credentials = ClientSecretCredential(
        client_id=client_id, client_secret=client_secret, tenant_id=tenant_id
    )
    return credentials


class CheckAzure:
    @staticmethod
    def check_resources():
        subscription_id = os.environ["ARM_SUBSCRIPTION_ID"]
        resource_group_name = "rg-ldo-euw-dev-mgt"
        resource_type = "Microsoft.KeyVault/vaults"
        arm_client = ResourceManagementClient(azure_authenticate(), subscription_id)
        api_version = "2022-11-01"

        # Set resources to empty list, then list all objects if they made the resource type
        resources = []
        for resource in arm_client.resources.list_by_resource_group(
            resource_group_name
        ):
            if resource_type in resource.type:
                resources.append(resource)
                if resources:
                    print(
                        f"The resource group {resource_group_name} contains the following resources of {resource_type}:"
                    )
                    # For every returned every resource
                    for resource in resources:
                        print(f" - {resource.name}")
                        resource_name = resource.name
                        resource_id = resource.id
                        get_each_id = arm_client.resources.get_by_id(
                            resource_id, api_version=api_version
                        )
                        print(get_each_id.properties["vaultUri"])
                else:
                    print(
                        f"The resource group {resource_group_name} "
                        f"does not contain any resources of {resource_type}, skipping..."
                    )


# Instantiate the class
obj = CheckAzure()
obj.check_resources()

Using the Azure metadata service, query some properties of your VM

import os
from azure.identity import ManagedIdentityCredential
import requests
import json

credential = ManagedIdentityCredential() # Gets managed id stuff
no_proxy = os.environ["NO_PROXY"] = "*" # determines no proxy at host level
url = "http://169.254.169.254/metadata/instance?api-version=2021-02-01" # azure metadata service
headers = {"Metadata": "true"} # the extra headers needed

r = requests.get(url=url, headers=headers) # requst url with extra headers

parsed_json = json.loads(r.content) # load the url request as json

pretty_json = json.dumps(parsed_json, indent=2) # parse the json so it looks better

print(pretty_json) # print the pretty json

subscription_id = parsed_json["compute"]["subscriptionId"] # query the metadata service and get the subscription id

resource_id = parsed_json["compute"]["resourceId"] # also get the resource id

segments = resource_id.split("/") # split the / of the resource id as a delimiter, then query the results

print(segments) # shows resuts in dict

segement_subscription_id = segments[2] # get second element after split, in this case the already known sub id
segement_resource_group_name = segments[4]  # get the fourth element, which is the rg_name
segement_provider = segments[6]  # get the provider, in this case its Microsoft.Compute as querys are running from a VM
segement_resource_type = segments[7] # get the resource type from the split, in this case, its virtualmachine
segement_resource_name = segments[8] # get resource name

print(
    f"The subscription id is {segement_subscription_id}\n"
    f"The resource group name is {segement_resource_group_name}\n"
    f"The provider name is {segement_provider}\n"
    f"The resource type is {segement_resource_type}\n"
    f"The resource name is {segement_resource_name}\n"
)

Example Python Function app using a Managed Identity

from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
import azure.functions as func
import os
import logging
import datetime

# Use ManagedIdentityCredential to log into Azure
subscription_id = os.environ["ARM_SUBSCRIPTION_ID"]
tenant_id = os.environ["ARM_TENANT_ID"]
client_id = os.environ["ARM_CLIENT_ID"]
function_app_name = os.environ["FUNCTION_APP_NAME"]
resource_group_name = os.environ["RESOURCE_GROUP_NAME"]

def main(getazureinfo: func.TimerRequest) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()

    credential = DefaultAzureCredential(managed_identity_client_id=client_id)  # User assigned Managed Identity
    # credential = DefaultAzureCredential() # system-assigned identity

    # Create a ResourceManagementClient using the authenticated credential
    client = ResourceManagementClient(credential, subscription_id)

    # Get the current resource group
    resource_group = client.resource_groups.get(resource_group_name)

    # List all resources in the resource group
    resources = client.resources.list_by_resource_group(resource_group.name)

    # Print the id of each resource
    for resource in resources:
        logging.info(f"The resources within {resource_group_name}: {resource.id}")

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

Upgrade all Outdated pip packages

Windows

python3 -m pip install --upgrade pip ; pip freeze | %{$_.split('==')[0]} | %{pip install --upgrade $_} # Windows

Linux

pip3 list -o | cut -f1 -d' ' | tr " " "\n" | awk '{if(NR>=3)print}' | cut -d' ' -f1 | xargs -n1 pip3 install -U 

Various Python Utils and code examples

import logging
import requests
import time
import os
from pathlib import Path
from urllib.parse import urljoin


class CustomFormatter(logging.Formatter):
    """Logging Formatter to add colors"""

    lightcyan = "\033[1;36m"
    lightred = "\033[1;31m"
    yellow = "\033[1;33m"
    nocolor = "\033[0m"
    purple = "\033[0;35m"
    format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

    FORMATS = {
        logging.DEBUG: lightcyan + format + nocolor,
        logging.INFO: purple + format + nocolor,
        logging.WARNING: yellow + format + nocolor,
        logging.ERROR: lightred + format + nocolor,
        logging.CRITICAL: lightred + format + nocolor,
    }

    def format(self, record):
        log_fmt = self.FORMATS.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)


# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Console handler with custom formatter
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(CustomFormatter())

# Add handlers to logger
logger.addHandler(ch)


def print_success(message):
    lightcyan = "\033[1;36m"
    nocolor = "\033[0m"
    print(f"{lightcyan}{message}{nocolor}")


def print_error(message):
    lightred = "\033[1;31m"
    nocolor = "\033[0m"
    print(f"{lightred}{message}{nocolor}")


def print_alert(message):
    yellow = "\033[1;33m"
    nocolor = "\033[0m"
    print(f"{yellow}{message}{nocolor}")


def handle_download_error(arch):
    print_error(
        f"- Failed to download {arch} providers, it is possible the providers in question do not have a release for that platform e.g, no windows_amd64 release but has a darwin and linux release. Another factor could be the proxy interfering with the download or a general failure has occurred\n"
    )


def get_provider_data_with_retry(url, retries=3, backoff_in_seconds=1):
    for n in range(retries):
        try:
            logger.debug("Attempting to get provider data")
            return requests.get(url).json()
        except Exception as e:
            if n == retries - 1:  # This was the last attempt
                logger.error(f"An error as occurred: {e}")
                raise
            else:
                print_error(
                    f"Failed to fetch provider data, attempt {n+1} of {retries}. Retrying in {backoff_in_seconds} seconds."
                )
                time.sleep(backoff_in_seconds)
                backoff_in_seconds *= 2  # Exponential backoff


def get_latest_terraform_version():
    response = requests.get("https://checkpoint-api.hashicorp.com/v1/check/terraform")
    data = response.json()
    print(f"Current version: {data['current_version']}")
    return data["current_version"]


def check_terraform_version_and_get_url(version):
    base_url = (
        "https://releases.hashicorp.com/terraform/{}/terraform_{}_linux_amd64.zip"
    )
    response = requests.head(base_url.format(version, version))
    if response.status_code == 200:
        return base_url.format(version, version)
    else:
        return None


def download_terraform_zip(
    url,
    version,
    destination_path,
    os_names={"linux", "darwin", "windows"},
    retries=3,
    backoff_in_seconds=1,
):
    for n in range(retries):
        for os_name in os_names:
            try:
                response = requests.get(url)
                file_path = os.path.join(
                    destination_path, f"terraform_{version}_{os_name}_amd64.zip"
                )

                # Create the destination directory if it does not exist
                os.makedirs(os.path.dirname(file_path), exist_ok=True)

                with open(file_path, "wb") as f:
                    f.write(response.content)
                logger.debug(f"Downloaded {url} to {file_path}")
            except Exception as e:
                logger.error(f"An error has occurred during the download: {e}")
                if n == retries - 1:  # This was the last attempt
                    logger.error(f"An error has occurred: {e}")
                    raise
                else:
                    logger.debug(
                        f"Failed to fetch provider data, attempt {n + 1} of {retries}. Retrying in {backoff_in_seconds} seconds."
                    )
                    time.sleep(backoff_in_seconds)



                    backoff_in_seconds *= 2  # Exponential backoff

Sort terraform variables.tf

import re
import argparse


def read_terraform_file(filename):
    with open(filename, "r") as file:
        file_content = file.read()
    return file_content


def write_terraform_file(filename, sorted_vars):
    with open(filename, "w") as file:
        file.write(sorted_vars)


def sort_terraform_variables(content):
    # Revised regex pattern to match variable blocks with nested structures
    pattern = re.compile(r'variable\s+"[^"]+"\s+\{.*?\n}', re.DOTALL)
    variables = pattern.findall(content)
    # Sort variables alphabetically
    variables.sort(key=lambda x: re.search(r'variable\s+"([^"]+)"', x).group(1))
    return "\n\n".join(variables)


def parse_arguments():
    parser = argparse.ArgumentParser(description="Sort Terraform variables.")
    parser.add_argument(
        "-i",
        "--in-file",
        type=str,
        help="Input Terraform file",
        default="./variables.tf",
    )
    parser.add_argument(
        "-o",
        "--out-file",
        type=str,
        help="Output Terraform file",
        default="./variables.tf",
    )
    return parser.parse_args()


def main():
    args = parse_arguments()
    content = read_terraform_file(args.in_file)
    sorted_vars = sort_terraform_variables(content)
    write_terraform_file(args.out_file, sorted_vars)
    print(f"Sorted variables written to {args.out_file}")


if __name__ == "__main__":
    main()

Sort terraform outputs.tf

import re
import argparse


def read_terraform_file(filename):
    with open(filename, "r") as file:
        file_content = file.read()
    return file_content


def write_terraform_file(filename, sorted_vars):
    with open(filename, "w") as file:
        file.write(sorted_vars)


def sort_terraform_outputs(content):
    pattern = re.compile(r'output\s+"[^"]+"\s+\{.*?\n}', re.DOTALL)
    outputs = pattern.findall(content)
    outputs.sort(key=lambda x: re.search(r'output\s+"([^"]+)"', x).group(1))
    return "\n\n".join(outputs)


def parse_arguments():
    parser = argparse.ArgumentParser(description="Sort Terraform output variables.")
    parser.add_argument(
        "-i", "--in-file", type=str, help="Input Terraform file", default="./outputs.tf"
    )
    parser.add_argument(
        "-o",
        "--out-file",
        type=str,
        help="Output Terraform file",
        default="./outputs.tf",
    )
    return parser.parse_args()


def main():
    args = parse_arguments()
    content = read_terraform_file(args.in_file)
    sorted_outputs = sort_terraform_outputs(content)
    write_terraform_file(args.out_file, sorted_outputs)
    print(f"Sorted outputs written to {args.out_file}")


if __name__ == "__main__":
    main()

Source: docs/cheatsheets/python-cheatsheet.md