Tu connais Python. Maintenant, il faut le faire bosser. Ce chapitre est le cœur de la série : on utilise Python pour faire ce que tu fais au quotidien en DevOps — exécuter des commandes système, interagir avec le filesystem, appeler des API REST et se connecter en SSH sur tes serveurs. Chaque exemple est fonctionnel et prêt à être adapté à ton infra.
Subprocess : piloter le système depuis Python
Le module subprocess est le pont entre Python et le shell. Il remplace les scripts bash fragiles par du code testable et maintenable.
subprocess.run — la méthode standard
import subprocess
# Commande simple avec capture de sortie
result = subprocess.run(
["docker", "ps", "--format", "{{.Names}}\t{{.Status}}"],
capture_output=True,
text=True,
check=True # lève CalledProcessError si returncode != 0
)
for line in result.stdout.strip().split("\n"):
name, status = line.split("\t")
print(f" {name}: {status}")
# Gestion d'erreurs complète
try:
result = subprocess.run(
["systemctl", "status", "nginx"],
capture_output=True, text=True,
check=True, timeout=10
)
print("✅ Nginx actif")
except subprocess.CalledProcessError as e:
print(f"❌ Nginx inactif (code {e.returncode})")
except subprocess.TimeoutExpired:
print("⏱️ Timeout dépassé")
except FileNotFoundError:
print("❌ Commande introuvable")
🔥 Bonne pratique : toujours passer la commande en liste (["ls", "-la"]) plutôt qu’en string. La forme string avec shell=True expose à des injections de commandes si les arguments viennent d’un utilisateur.
Streaming en temps réel
Pour suivre un build Docker ou un déploiement long, Popen permet d’afficher la sortie ligne par ligne :
import subprocess
import sys
def run_streaming(command):
"""Exécute une commande avec sortie en temps réel."""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
for line in process.stdout:
print(line, end="")
sys.stdout.flush()
process.wait()
return process.returncode
# Suivre un build en live
code = run_streaming(["docker", "build", "-t", "myapp:latest", "."])
if code != 0:
print("❌ Build échoué")
💡 Astuce : subprocess.STDOUT redirige stderr vers stdout, ce qui te donne un flux unique et ordonné. Pratique pour les logs de build qui mélangent les deux.
Filesystem : os, sys et pathlib
Python offre trois modules complémentaires pour interagir avec le système de fichiers. En 2026, pathlib est le choix par défaut — mais os et sys restent indispensables pour certaines opérations.
import os
import sys
from pathlib import Path
# --- os : variables d'environnement et infos système ---
home = os.getenv("HOME")
db_url = os.getenv("DATABASE_URL", "sqlite:///local.db")
os.environ["APP_ENV"] = "production"
print(f"CPUs: {os.cpu_count()}, PID: {os.getpid()}")
# --- sys : arguments et plateforme ---
print(sys.argv) # ['script.py', '--env', 'prod']
print(sys.platform) # 'linux', 'darwin', 'win32'
if not Path("/etc/myapp/config.yaml").exists():
print("❌ Config manquante", file=sys.stderr)
sys.exit(1)
# --- pathlib : manipulation moderne des chemins ---
log_dir = Path("/var/log/myapp")
log_dir.mkdir(parents=True, exist_ok=True)
# Lister les fichiers log avec leur taille
for f in Path("/var/log").glob("**/*.log"):
size_mb = f.stat().st_size / 1024 / 1024
print(f" {f.name} → {size_mb:.1f} MB")
# Trouver les fichiers modifiés dans la dernière heure
import time
cutoff = time.time() - 3600
recent = [f for f in log_dir.rglob("*.log") if f.stat().st_mtime > cutoff]
⚠️ Piège classique : ne jamais construire des chemins avec des f-strings (f"/var/log/{name}") ou des +. L’opérateur / de Pathlib est cross-platform et gère automatiquement les séparateurs Windows/Linux.
Requests : appels API REST
La bibliothèque requests est le standard pour interagir avec les API — GitHub, Kubernetes, cloud providers, monitoring.
import requests
# GET simple
r = requests.get("https://api.github.com/repos/docker/docker")
print(f"Stars: {r.json()['stargazers_count']}")
# POST avec authentification
headers = {"Authorization": "token ghp_xxx"}
data = {"name": "new-repo", "private": True}
r = requests.post("https://api.github.com/user/repos", json=data, headers=headers)
r.raise_for_status() # lève HTTPError si status >= 400
Session avec retry automatique
En production, les requêtes échouent (timeout, 503, rate limit). Une session avec retry intégrée évite de gérer ça manuellement partout :
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session(retries=3, backoff=0.5, timeout=10):
"""Session HTTP avec retry et backoff exponentiel."""
session = requests.Session()
retry = Retry(
total=retries,
backoff_factor=backoff,
status_forcelist=[429, 500, 502, 503, 504]
)
session.mount("http://", HTTPAdapter(max_retries=retry))
session.mount("https://", HTTPAdapter(max_retries=retry))
return session
# Utilisation — résiste aux erreurs transitoires
session = create_session()
r = session.get("https://api.example.com/health", timeout=10)
🎯 En entreprise : cette session avec retry est le premier truc à mettre en place dans tout client API Python. Les status_forcelist couvrent les cas classiques — rate limiting (429), erreurs serveur (5xx). Le backoff exponentiel évite de marteler un service déjà en difficulté.
Paramiko : SSH depuis Python
Paramiko remplace les scripts bash bourrés de ssh user@host "commande" par du code Python propre avec gestion d’erreurs, parallélisme et transfert de fichiers.
import paramiko
def ssh_execute(host, username, command, key_path=None, port=22):
"""Exécute une commande SSH sur un serveur distant."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(host, port=port, username=username, key_filename=key_path)
stdin, stdout, stderr = client.exec_command(command, timeout=30)
return {
"output": stdout.read().decode().strip(),
"error": stderr.read().decode().strip(),
"exit_code": stdout.channel.recv_exit_status()
}
finally:
client.close()
# Utilisation
result = ssh_execute("10.0.1.10", "admin", "uptime && df -h /", key_path="~/.ssh/id_rsa")
print(result["output"])
Exécution parallèle sur plusieurs serveurs
from concurrent.futures import ThreadPoolExecutor, as_completed
def multi_ssh(servers, command, username, key_path, max_workers=5):
"""Exécute une commande sur N serveurs en parallèle."""
def run_on(server):
try:
return server, ssh_execute(server, username, command, key_path)
except Exception as e:
return server, {"output": "", "error": str(e), "exit_code": -1}
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(run_on, s): s for s in servers}
for future in as_completed(futures):
server, result = future.result()
results[server] = result
icon = "✅" if result["exit_code"] == 0 else "❌"
print(f" {icon} {server}: {result['output'][:80]}")
return results
# Vérifier l'uptime de tout un parc
servers = ["10.0.1.10", "10.0.1.11", "10.0.1.12"]
multi_ssh(servers, "uptime", "admin", "~/.ssh/id_rsa")
🔥 Pourquoi Python plutôt qu’un script bash : le ThreadPoolExecutor exécute les commandes en parallèle sur 5 serveurs simultanément. En bash, tu aurais besoin de &, wait, et de gérer les PID manuellement — avec Paramiko, c’est 15 lignes lisibles.
Transfert de fichiers (SFTP)
Paramiko intègre aussi un client SFTP pour déployer des fichiers sur tes serveurs :
def sftp_upload(host, username, local_path, remote_path, key_path):
"""Upload un fichier via SFTP."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(host, username=username, key_filename=key_path)
sftp = client.open_sftp()
sftp.put(str(local_path), str(remote_path))
print(f"✅ {local_path} → {host}:{remote_path}")
sftp.close()
finally:
client.close()
# Déployer un script sur un serveur
sftp_upload("10.0.1.10", "admin", "deploy.sh", "/opt/scripts/deploy.sh", "~/.ssh/id_rsa")
💡 Alternative : pour des déploiements complexes multi-serveurs, regarde Ansible (qui utilise Paramiko sous le capot). Mais pour des scripts ponctuels ou des outils custom, Paramiko reste plus léger et flexible.
Pièges fréquents et résumé
⚠️ Les erreurs classiques en scripting Python :
shell=Trueavec des inputs utilisateur — injection de commandes. Toujours la forme en liste quand possible- Oublier
timeout— unrequests.getousubprocess.runsans timeout peut bloquer indéfiniment AutoAddPolicyen production — acceptable en dev, mais en prod utiliseRejectPolicyavec unknown_hostsvérifié- Ne pas fermer les connexions SSH — utilise
finallyou un context manager pour garantir leclient.close() - Hardcoder des credentials — utilise des variables d’environnement ou un
.envavecpython-dotenv
🎯 Ce qu’il faut retenir :
subprocess.runaveccapture_output=True, text=True, check=True— la combinaison standardpathlib.Pathpour tout ce qui touche aux fichiers et répertoiresrequests.Sessionavec retry pour tout client APIparamiko+ThreadPoolExecutorpour le SSH parallèle sur un parc de serveurs- Toujours ajouter
timeoutà tes appels réseau et commandes système
Dans le prochain chapitre, on passe au parsing de logs, expressions régulières et monitoring — les outils pour comprendre ce qui se passe dans ton infrastructure.
Contenu réservé aux abonnés
Ce chapitre fait partie de la formation complète. Abonne-toi pour débloquer tous les contenus.
Débloquer pour 29 CHF/moisLe chapitre 1 de chaque formation est gratuit.
Série pas encore débloquée
Termine la série prérequise d'abord pour accéder à ce contenu.
Aller à la série prérequiseSérie : Python DevOps
5 / 8Sur cette page
Articles liés
Parsing de logs et monitoring
Scripts DevOps concrets : déploiement automatisé, health checks, parsing de logs, et monitoring système avec Python.
Bash scripting : les bases
Apprends les bases du scripting Bash : variables, conditions, boucles et l'anatomie d'un script.
Scripts Bash avancés pour le DevOps
Maîtrise les fonctions, les pipes, les commandes essentielles, les cron jobs et des scripts pratiques du monde réel.