Aller au contenu principal
PythonDevOpsAutomatisationScripting

Scripting DevOps : automatisation

30 min de lecture Python DevOps — Chapitre 5

subprocess pour exécuter des commandes, os/sys/pathlib pour le filesystem, requests pour les API HTTP, et Paramiko pour le SSH en Python.

Tu connais Python. Maintenant, il faut le faire bosser. Ce chapitre est le cœur de la série : on utilise Python pour faire ce que tu fais au quotidien en DevOps — exécuter des commandes système, interagir avec le filesystem, appeler des API REST et se connecter en SSH sur tes serveurs. Chaque exemple est fonctionnel et prêt à être adapté à ton infra.

Subprocess : piloter le système depuis Python

Le module subprocess est le pont entre Python et le shell. Il remplace les scripts bash fragiles par du code testable et maintenable.

subprocess.run — la méthode standard

import subprocess

# Commande simple avec capture de sortie
result = subprocess.run(
    ["docker", "ps", "--format", "{{.Names}}\t{{.Status}}"],
    capture_output=True,
    text=True,
    check=True  # lève CalledProcessError si returncode != 0
)

for line in result.stdout.strip().split("\n"):
    name, status = line.split("\t")
    print(f"  {name}: {status}")

# Gestion d'erreurs complète
try:
    result = subprocess.run(
        ["systemctl", "status", "nginx"],
        capture_output=True, text=True,
        check=True, timeout=10
    )
    print("✅ Nginx actif")
except subprocess.CalledProcessError as e:
    print(f"❌ Nginx inactif (code {e.returncode})")
except subprocess.TimeoutExpired:
    print("⏱️ Timeout dépassé")
except FileNotFoundError:
    print("❌ Commande introuvable")

🔥 Bonne pratique : toujours passer la commande en liste (["ls", "-la"]) plutôt qu’en string. La forme string avec shell=True expose à des injections de commandes si les arguments viennent d’un utilisateur.

Streaming en temps réel

Pour suivre un build Docker ou un déploiement long, Popen permet d’afficher la sortie ligne par ligne :

import subprocess
import sys

def run_streaming(command):
    """Exécute une commande avec sortie en temps réel."""
    process = subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True
    )
    for line in process.stdout:
        print(line, end="")
        sys.stdout.flush()
    process.wait()
    return process.returncode

# Suivre un build en live
code = run_streaming(["docker", "build", "-t", "myapp:latest", "."])
if code != 0:
    print("❌ Build échoué")

💡 Astuce : subprocess.STDOUT redirige stderr vers stdout, ce qui te donne un flux unique et ordonné. Pratique pour les logs de build qui mélangent les deux.

Filesystem : os, sys et pathlib

Python offre trois modules complémentaires pour interagir avec le système de fichiers. En 2026, pathlib est le choix par défaut — mais os et sys restent indispensables pour certaines opérations.

import os
import sys
from pathlib import Path

# --- os : variables d'environnement et infos système ---
home = os.getenv("HOME")
db_url = os.getenv("DATABASE_URL", "sqlite:///local.db")
os.environ["APP_ENV"] = "production"
print(f"CPUs: {os.cpu_count()}, PID: {os.getpid()}")

# --- sys : arguments et plateforme ---
print(sys.argv)          # ['script.py', '--env', 'prod']
print(sys.platform)      # 'linux', 'darwin', 'win32'

if not Path("/etc/myapp/config.yaml").exists():
    print("❌ Config manquante", file=sys.stderr)
    sys.exit(1)

# --- pathlib : manipulation moderne des chemins ---
log_dir = Path("/var/log/myapp")
log_dir.mkdir(parents=True, exist_ok=True)

# Lister les fichiers log avec leur taille
for f in Path("/var/log").glob("**/*.log"):
    size_mb = f.stat().st_size / 1024 / 1024
    print(f"  {f.name}{size_mb:.1f} MB")

# Trouver les fichiers modifiés dans la dernière heure
import time
cutoff = time.time() - 3600
recent = [f for f in log_dir.rglob("*.log") if f.stat().st_mtime > cutoff]

⚠️ Piège classique : ne jamais construire des chemins avec des f-strings (f"/var/log/{name}") ou des +. L’opérateur / de Pathlib est cross-platform et gère automatiquement les séparateurs Windows/Linux.

Requests : appels API REST

La bibliothèque requests est le standard pour interagir avec les API — GitHub, Kubernetes, cloud providers, monitoring.

import requests

# GET simple
r = requests.get("https://api.github.com/repos/docker/docker")
print(f"Stars: {r.json()['stargazers_count']}")

# POST avec authentification
headers = {"Authorization": "token ghp_xxx"}
data = {"name": "new-repo", "private": True}
r = requests.post("https://api.github.com/user/repos", json=data, headers=headers)
r.raise_for_status()  # lève HTTPError si status >= 400

Session avec retry automatique

En production, les requêtes échouent (timeout, 503, rate limit). Une session avec retry intégrée évite de gérer ça manuellement partout :

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session(retries=3, backoff=0.5, timeout=10):
    """Session HTTP avec retry et backoff exponentiel."""
    session = requests.Session()
    retry = Retry(
        total=retries,
        backoff_factor=backoff,
        status_forcelist=[429, 500, 502, 503, 504]
    )
    session.mount("http://", HTTPAdapter(max_retries=retry))
    session.mount("https://", HTTPAdapter(max_retries=retry))
    return session

# Utilisation — résiste aux erreurs transitoires
session = create_session()
r = session.get("https://api.example.com/health", timeout=10)

🎯 En entreprise : cette session avec retry est le premier truc à mettre en place dans tout client API Python. Les status_forcelist couvrent les cas classiques — rate limiting (429), erreurs serveur (5xx). Le backoff exponentiel évite de marteler un service déjà en difficulté.

Paramiko : SSH depuis Python

Paramiko remplace les scripts bash bourrés de ssh user@host "commande" par du code Python propre avec gestion d’erreurs, parallélisme et transfert de fichiers.

import paramiko

def ssh_execute(host, username, command, key_path=None, port=22):
    """Exécute une commande SSH sur un serveur distant."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(host, port=port, username=username, key_filename=key_path)
        stdin, stdout, stderr = client.exec_command(command, timeout=30)
        return {
            "output": stdout.read().decode().strip(),
            "error": stderr.read().decode().strip(),
            "exit_code": stdout.channel.recv_exit_status()
        }
    finally:
        client.close()

# Utilisation
result = ssh_execute("10.0.1.10", "admin", "uptime && df -h /", key_path="~/.ssh/id_rsa")
print(result["output"])

Exécution parallèle sur plusieurs serveurs

from concurrent.futures import ThreadPoolExecutor, as_completed

def multi_ssh(servers, command, username, key_path, max_workers=5):
    """Exécute une commande sur N serveurs en parallèle."""
    def run_on(server):
        try:
            return server, ssh_execute(server, username, command, key_path)
        except Exception as e:
            return server, {"output": "", "error": str(e), "exit_code": -1}

    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(run_on, s): s for s in servers}
        for future in as_completed(futures):
            server, result = future.result()
            results[server] = result
            icon = "" if result["exit_code"] == 0 else ""
            print(f"  {icon} {server}: {result['output'][:80]}")
    return results

# Vérifier l'uptime de tout un parc
servers = ["10.0.1.10", "10.0.1.11", "10.0.1.12"]
multi_ssh(servers, "uptime", "admin", "~/.ssh/id_rsa")

🔥 Pourquoi Python plutôt qu’un script bash : le ThreadPoolExecutor exécute les commandes en parallèle sur 5 serveurs simultanément. En bash, tu aurais besoin de &, wait, et de gérer les PID manuellement — avec Paramiko, c’est 15 lignes lisibles.

Transfert de fichiers (SFTP)

Paramiko intègre aussi un client SFTP pour déployer des fichiers sur tes serveurs :

def sftp_upload(host, username, local_path, remote_path, key_path):
    """Upload un fichier via SFTP."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(host, username=username, key_filename=key_path)
        sftp = client.open_sftp()
        sftp.put(str(local_path), str(remote_path))
        print(f"✅ {local_path}{host}:{remote_path}")
        sftp.close()
    finally:
        client.close()

# Déployer un script sur un serveur
sftp_upload("10.0.1.10", "admin", "deploy.sh", "/opt/scripts/deploy.sh", "~/.ssh/id_rsa")

💡 Alternative : pour des déploiements complexes multi-serveurs, regarde Ansible (qui utilise Paramiko sous le capot). Mais pour des scripts ponctuels ou des outils custom, Paramiko reste plus léger et flexible.

Pièges fréquents et résumé

⚠️ Les erreurs classiques en scripting Python :

  • shell=True avec des inputs utilisateur — injection de commandes. Toujours la forme en liste quand possible
  • Oublier timeout — un requests.get ou subprocess.run sans timeout peut bloquer indéfiniment
  • AutoAddPolicy en production — acceptable en dev, mais en prod utilise RejectPolicy avec un known_hosts vérifié
  • Ne pas fermer les connexions SSH — utilise finally ou un context manager pour garantir le client.close()
  • Hardcoder des credentials — utilise des variables d’environnement ou un .env avec python-dotenv

🎯 Ce qu’il faut retenir :

  • subprocess.run avec capture_output=True, text=True, check=True — la combinaison standard
  • pathlib.Path pour tout ce qui touche aux fichiers et répertoires
  • requests.Session avec retry pour tout client API
  • paramiko + ThreadPoolExecutor pour le SSH parallèle sur un parc de serveurs
  • Toujours ajouter timeout à tes appels réseau et commandes système

Dans le prochain chapitre, on passe au parsing de logs, expressions régulières et monitoring — les outils pour comprendre ce qui se passe dans ton infrastructure.

Articles liés