Dans le chapitre précédent, on a vu les outils Python pour l’automatisation : subprocess, os/pathlib, requests et Paramiko. Maintenant, on assemble tout dans des scripts DevOps concrets.
🎯 Objectif : À la fin de ce chapitre, tu maîtriseras les concepts présentés ci-dessous.
Scripts DevOps concrets
Script de déploiement
#!/usr/bin/env python3
"""
Script de déploiement automatisé.
Usage: python deploy.py --service api --env production --version v2.1.0
"""
import subprocess
import sys
import json
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("deploy")
class Deployer:
def __init__(self, service, env, version):
self.service = service
self.env = env
self.version = version
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
def run_cmd(self, cmd, check=True):
"""Exécute une commande et retourne la sortie."""
logger.info(f"$ {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=check)
if result.stdout:
logger.debug(result.stdout[:200])
return result
def pre_checks(self):
"""Vérifications avant déploiement."""
logger.info("🔍 Vérifications pré-déploiement...")
# Vérifier que l'image existe
result = self.run_cmd(
f"docker manifest inspect registry.example.com/{self.service}:{self.version}",
check=False
)
if result.returncode != 0:
raise RuntimeError(f"Image {self.service}:{self.version} introuvable")
# Vérifier la connexion au cluster
self.run_cmd("kubectl cluster-info")
logger.info("✅ Pré-checks OK")
def backup(self):
"""Sauvegarde la configuration actuelle."""
logger.info("💾 Backup de la config actuelle...")
backup_dir = Path(f"/tmp/deploy-backups/{self.timestamp}")
backup_dir.mkdir(parents=True, exist_ok=True)
self.run_cmd(
f"kubectl get deployment {self.service} -n {self.env} -o yaml > {backup_dir}/deployment.yaml",
check=False
)
return backup_dir
def deploy(self):
"""Exécute le déploiement."""
logger.info(f"🚀 Déploiement de {self.service}:{self.version} en {self.env}...")
self.run_cmd(
f"kubectl set image deployment/{self.service} "
f"{self.service}=registry.example.com/{self.service}:{self.version} "
f"-n {self.env}"
)
# Attendre que le rollout soit terminé
self.run_cmd(
f"kubectl rollout status deployment/{self.service} -n {self.env} --timeout=300s"
)
def post_checks(self):
"""Vérifications post-déploiement."""
logger.info("🔍 Vérifications post-déploiement...")
result = self.run_cmd(
f"kubectl get pods -n {self.env} -l app={self.service} -o json"
)
pods = json.loads(result.stdout)
running = [
p for p in pods["items"]
if p["status"]["phase"] == "Running"
]
logger.info(f" Pods running : {len(running)}/{len(pods['items'])}")
if len(running) == 0:
raise RuntimeError("Aucun pod en running après déploiement")
def rollback(self):
"""Rollback en cas d'erreur."""
logger.warning("⏪ Rollback en cours...")
self.run_cmd(
f"kubectl rollout undo deployment/{self.service} -n {self.env}"
)
def run(self):
"""Exécute le pipeline de déploiement complet."""
try:
self.pre_checks()
self.backup()
self.deploy()
self.post_checks()
logger.info(f"✅ Déploiement réussi : {self.service}:{self.version} en {self.env}")
except Exception as e:
logger.error(f"❌ Déploiement échoué : {e}")
self.rollback()
sys.exit(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Déploiement automatisé")
parser.add_argument("--service", required=True, help="Nom du service")
parser.add_argument("--env", required=True, choices=["staging", "production"])
parser.add_argument("--version", required=True, help="Version/tag de l'image")
args = parser.parse_args()
deployer = Deployer(args.service, args.env, args.version)
deployer.run()
Script de health check
#!/usr/bin/env python3
"""
Health check multi-services avec alerting.
"""
import requests
import json
import time
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("healthcheck")
# Configuration des services à vérifier
SERVICES = [
{"name": "API Gateway", "url": "https://api.example.com/health", "timeout": 5},
{"name": "Frontend", "url": "https://www.example.com", "timeout": 10},
{"name": "Auth Service", "url": "https://auth.example.com/health", "timeout": 5},
{"name": "Prometheus", "url": "http://monitoring.internal:9090/-/healthy", "timeout": 5},
{"name": "Grafana", "url": "http://monitoring.internal:3000/api/health", "timeout": 5},
]
def check_http(name, url, timeout=5, expected_status=200):
"""Vérifie un endpoint HTTP."""
start = time.time()
try:
response = requests.get(url, timeout=timeout, allow_redirects=True)
elapsed = (time.time() - start) * 1000 # en ms
return {
"name": name,
"url": url,
"status": "healthy" if response.status_code == expected_status else "degraded",
"http_code": response.status_code,
"response_time_ms": round(elapsed, 2),
"timestamp": datetime.now().isoformat()
}
except requests.ConnectionError:
return {"name": name, "url": url, "status": "down", "error": "Connection refused",
"timestamp": datetime.now().isoformat()}
except requests.Timeout:
return {"name": name, "url": url, "status": "timeout", "error": f"Timeout ({timeout}s)",
"timestamp": datetime.now().isoformat()}
except Exception as e:
return {"name": name, "url": url, "status": "error", "error": str(e),
"timestamp": datetime.now().isoformat()}
def send_alert(service_result):
"""Envoie une alerte (webhook Slack, email, etc.)."""
# Exemple avec un webhook Slack
webhook_url = "https://hooks.slack.com/services/xxx/yyy/zzz"
payload = {
"text": f"🚨 *{service_result['name']}* is *{service_result['status']}*\n"
f"URL: {service_result['url']}\n"
f"Error: {service_result.get('error', 'N/A')}"
}
try:
requests.post(webhook_url, json=payload, timeout=5)
except Exception:
logger.error("Impossible d'envoyer l'alerte Slack")
def run_health_checks():
"""Exécute tous les health checks."""
results = []
for svc in SERVICES:
result = check_http(svc["name"], svc["url"], svc.get("timeout", 5))
results.append(result)
# Log
if result["status"] == "healthy":
logger.info(f"✅ {result['name']}: {result['status']} ({result.get('response_time_ms', '?')}ms)")
else:
logger.warning(f"❌ {result['name']}: {result['status']} — {result.get('error', '')}")
send_alert(result)
# Sauvegarder le rapport
report_path = Path("/tmp/health_report.json")
report_path.write_text(json.dumps(results, indent=2))
# Résumé
healthy = sum(1 for r in results if r["status"] == "healthy")
total = len(results)
logger.info(f"\n📊 Résultat : {healthy}/{total} services healthy")
return results
if __name__ == "__main__":
run_health_checks()
Script de parsing de logs
#!/usr/bin/env python3
"""
Analyseur de logs Nginx — extrait les métriques clés.
"""
import re
import json
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
# Pattern pour les logs Nginx au format combined
NGINX_PATTERN = re.compile(
r'(?P<ip>\S+) - \S+ \[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d{3}) (?P<size>\d+) '
r'"(?P<referrer>[^"]*)" "(?P<user_agent>[^"]*)"'
)
def parse_nginx_log(log_path, limit=None):
"""Parse un fichier de log Nginx et retourne des statistiques."""
stats = {
"total_requests": 0,
"status_codes": Counter(),
"methods": Counter(),
"top_paths": Counter(),
"top_ips": Counter(),
"errors": [],
"total_bytes": 0,
"response_codes_by_path": defaultdict(Counter),
}
with open(log_path, "r") as f:
for i, line in enumerate(f):
if limit and i >= limit:
break
match = NGINX_PATTERN.match(line.strip())
if not match:
continue
data = match.groupdict()
stats["total_requests"] += 1
stats["status_codes"][data["status"]] += 1
stats["methods"][data["method"]] += 1
stats["top_paths"][data["path"]] += 1
stats["top_ips"][data["ip"]] += 1
stats["total_bytes"] += int(data["size"])
stats["response_codes_by_path"][data["path"]][data["status"]] += 1
# Collecter les erreurs 5xx
if data["status"].startswith("5"):
stats["errors"].append({
"ip": data["ip"],
"path": data["path"],
"status": data["status"],
"time": data["time"],
"user_agent": data["user_agent"]
})
return stats
def print_report(stats):
"""Affiche un rapport lisible."""
print("=" * 60)
print("📊 RAPPORT D'ANALYSE DES LOGS NGINX")
print("=" * 60)
print(f"\n📈 Total de requêtes : {stats['total_requests']}")
print(f"📦 Données transférées : {stats['total_bytes'] / 1024 / 1024:.2f} MB")
print("\n📋 Codes de statut :")
for code, count in stats["status_codes"].most_common():
pct = count / stats["total_requests"] * 100
bar = "█" * int(pct / 2)
print(f" {code}: {count:>6} ({pct:5.1f}%) {bar}")
print("\n🔝 Top 10 des pages les plus visitées :")
for path, count in stats["top_paths"].most_common(10):
print(f" {count:>6} — {path}")
print("\n🌐 Top 10 des IPs :")
for ip, count in stats["top_ips"].most_common(10):
print(f" {count:>6} — {ip}")
print(f"\n🔴 Erreurs 5xx : {len(stats['errors'])}")
for err in stats["errors"][:5]:
print(f" [{err['time']}] {err['status']} {err['path']} — {err['ip']}")
print("=" * 60)
if __name__ == "__main__":
import sys
log_file = sys.argv[1] if len(sys.argv) > 1 else "/var/log/nginx/access.log"
if not Path(log_file).exists():
print(f"❌ Fichier non trouvé : {log_file}")
sys.exit(1)
stats = parse_nginx_log(log_file)
print_report(stats)
# Export JSON
export = {k: v for k, v in stats.items() if k != "response_codes_by_path"}
export["status_codes"] = dict(export["status_codes"])
export["methods"] = dict(export["methods"])
export["top_paths"] = dict(export["top_paths"].most_common(20))
export["top_ips"] = dict(export["top_ips"].most_common(20))
output = Path("/tmp/nginx_analysis.json")
output.write_text(json.dumps(export, indent=2, default=str))
print(f"\n💾 Rapport exporté dans {output}")
Bonus : monitoring système rapide
#!/usr/bin/env python3
"""
Collecte des métriques système basiques (sans dépendance externe).
"""
import os
import subprocess
from pathlib import Path
def get_cpu_usage():
"""Retourne l'utilisation CPU moyenne sur 1 seconde."""
with open("/proc/stat") as f:
line1 = f.readline().split()
import time
time.sleep(1)
with open("/proc/stat") as f:
line2 = f.readline().split()
idle1 = int(line1[4])
idle2 = int(line2[4])
total1 = sum(int(x) for x in line1[1:])
total2 = sum(int(x) for x in line2[1:])
idle_delta = idle2 - idle1
total_delta = total2 - total1
return round((1 - idle_delta / total_delta) * 100, 1)
def get_memory_usage():
"""Retourne l'utilisation mémoire."""
with open("/proc/meminfo") as f:
lines = f.readlines()
mem = {}
for line in lines:
parts = line.split()
mem[parts[0].rstrip(":")] = int(parts[1])
total = mem["MemTotal"]
available = mem["MemAvailable"]
used = total - available
return {
"total_mb": round(total / 1024, 0),
"used_mb": round(used / 1024, 0),
"available_mb": round(available / 1024, 0),
"percent": round(used / total * 100, 1)
}
def get_disk_usage(mount="/"):
"""Retourne l'utilisation disque."""
stat = os.statvfs(mount)
total = stat.f_blocks * stat.f_frsize
free = stat.f_bfree * stat.f_frsize
used = total - free
return {
"total_gb": round(total / 1024**3, 1),
"used_gb": round(used / 1024**3, 1),
"free_gb": round(free / 1024**3, 1),
"percent": round(used / total * 100, 1)
}
if __name__ == "__main__":
print("🖥️ Métriques système")
print(f" CPU : {get_cpu_usage()}%")
mem = get_memory_usage()
print(f" RAM : {mem['used_mb']}/{mem['total_mb']} MB ({mem['percent']}%)")
disk = get_disk_usage("/")
print(f" Disk: {disk['used_gb']}/{disk['total_gb']} GB ({disk['percent']}%)")
À toi de jouer
Exercice 1 — Parsing de logs
Écris un script Python qui parse un fichier de logs nginx, extrait les codes HTTP avec une regex, et affiche un résumé : nombre de 200, 404, 500, etc. Utilise collections.Counter pour le comptage.
Exercice 2 — Health check multi-services
Crée un script qui vérifie la santé de 5 URLs (avec requests.get et un timeout de 5s). Affiche un tableau avec le statut, le temps de réponse, et le code HTTP. Gère les timeouts et erreurs de connexion proprement.
Exercice 3 — Défi bonus : monitoring système avec alertes
Écris un script qui collecte l’usage CPU (psutil.cpu_percent), RAM (psutil.virtual_memory), et disque (psutil.disk_usage). Si un seuil dépasse 80%, écris une alerte dans un fichier JSON horodaté. Configure-le en cron toutes les minutes.
Conclusion
Tu as maintenant les outils pour écrire de vrais scripts d’automatisation DevOps en Python :
- subprocess — exécuter des commandes système, capturer la sortie, gérer les erreurs
- os/sys/pathlib — interaction avec le système de fichiers et l’OS
- requests — appels HTTP, interaction avec des APIs REST
- paramiko — SSH et SFTP en Python
- Scripts concrets — déploiement, health check, parsing de logs, monitoring
Dans le prochain et dernier chapitre, on passe aux SDK cloud : Boto3, Google Cloud, Azure, Docker SDK, et l’intégration avec Terraform et Ansible.
📝 Résumé
pathlibest la façon moderne de manipuler les chemins de fichiers (pasos.path!)subprocess.run()exécute des commandes système avec capture de sortie et gestion d’erreursrequests(ouhttpx) pour les appels API REST — GET, POST, authentificationjsonetyamlpour parser/générer les formats de config DevOpsargparseouclicktransforment un script en CLI professionnellogging>print()— toujours, surtout en production
⚠️ Attention : N’utilise JAMAIS
os.system()oushell=Trueavec des entrées utilisateur — c’est une injection de commande en puissance. Préfèresubprocess.run()avec une liste d’arguments.
➡️ La suite
Prochain chapitre : les SDK cloud en Python — boto3 (AWS), google-cloud, azure-sdk pour piloter ton infrastructure directement depuis Python.
Contenu réservé aux abonnés
Ce chapitre fait partie de la formation complète. Abonne-toi pour débloquer tous les contenus.
Débloquer pour 29 CHF/moisLe chapitre 1 de chaque formation est gratuit.
Série pas encore débloquée
Termine la série prérequise d'abord pour accéder à ce contenu.
Aller à la série prérequiseSérie : Python DevOps
6 / 8Sur cette page
Articles liés
Scripting DevOps : automatisation
subprocess pour exécuter des commandes, os/sys/pathlib pour le filesystem, requests pour les API HTTP, et Paramiko pour le SSH en Python.
Structures de contrôle et fonctions
Conditions, boucles for/while, fonctions, et compréhensions de listes en Python. Tout pour écrire tes premiers scripts DevOps.
Python : les bases du langage
Premiers pas en Python pour le DevOps : variables, types de données, opérateurs, strings, listes, dictionnaires, tuples et sets.