Aller au contenu principal
PythonDevOpsAutomatisationMonitoring

Parsing de logs et monitoring

30 min de lecture Python DevOps — Chapitre 6

Scripts DevOps concrets : déploiement automatisé, health checks, parsing de logs, et monitoring système avec Python.

Dans le chapitre précédent, on a vu les outils Python pour l’automatisation : subprocess, os/pathlib, requests et Paramiko. Maintenant, on assemble tout dans des scripts DevOps concrets.

🎯 Objectif : À la fin de ce chapitre, tu maîtriseras les concepts présentés ci-dessous.

Scripts DevOps concrets

Script de déploiement

#!/usr/bin/env python3
"""
Script de déploiement automatisé.
Usage: python deploy.py --service api --env production --version v2.1.0
"""

import subprocess
import sys
import json
import logging
from datetime import datetime
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("deploy")


class Deployer:
    def __init__(self, service, env, version):
        self.service = service
        self.env = env
        self.version = version
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    def run_cmd(self, cmd, check=True):
        """Exécute une commande et retourne la sortie."""
        logger.info(f"$ {cmd}")
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=check)
        if result.stdout:
            logger.debug(result.stdout[:200])
        return result

    def pre_checks(self):
        """Vérifications avant déploiement."""
        logger.info("🔍 Vérifications pré-déploiement...")

        # Vérifier que l'image existe
        result = self.run_cmd(
            f"docker manifest inspect registry.example.com/{self.service}:{self.version}",
            check=False
        )
        if result.returncode != 0:
            raise RuntimeError(f"Image {self.service}:{self.version} introuvable")

        # Vérifier la connexion au cluster
        self.run_cmd("kubectl cluster-info")
        logger.info("✅ Pré-checks OK")

    def backup(self):
        """Sauvegarde la configuration actuelle."""
        logger.info("💾 Backup de la config actuelle...")
        backup_dir = Path(f"/tmp/deploy-backups/{self.timestamp}")
        backup_dir.mkdir(parents=True, exist_ok=True)

        self.run_cmd(
            f"kubectl get deployment {self.service} -n {self.env} -o yaml > {backup_dir}/deployment.yaml",
            check=False
        )
        return backup_dir

    def deploy(self):
        """Exécute le déploiement."""
        logger.info(f"🚀 Déploiement de {self.service}:{self.version} en {self.env}...")

        self.run_cmd(
            f"kubectl set image deployment/{self.service} "
            f"{self.service}=registry.example.com/{self.service}:{self.version} "
            f"-n {self.env}"
        )

        # Attendre que le rollout soit terminé
        self.run_cmd(
            f"kubectl rollout status deployment/{self.service} -n {self.env} --timeout=300s"
        )

    def post_checks(self):
        """Vérifications post-déploiement."""
        logger.info("🔍 Vérifications post-déploiement...")

        result = self.run_cmd(
            f"kubectl get pods -n {self.env} -l app={self.service} -o json"
        )
        pods = json.loads(result.stdout)
        running = [
            p for p in pods["items"]
            if p["status"]["phase"] == "Running"
        ]

        logger.info(f"  Pods running : {len(running)}/{len(pods['items'])}")

        if len(running) == 0:
            raise RuntimeError("Aucun pod en running après déploiement")

    def rollback(self):
        """Rollback en cas d'erreur."""
        logger.warning("⏪ Rollback en cours...")
        self.run_cmd(
            f"kubectl rollout undo deployment/{self.service} -n {self.env}"
        )

    def run(self):
        """Exécute le pipeline de déploiement complet."""
        try:
            self.pre_checks()
            self.backup()
            self.deploy()
            self.post_checks()
            logger.info(f"✅ Déploiement réussi : {self.service}:{self.version} en {self.env}")
        except Exception as e:
            logger.error(f"❌ Déploiement échoué : {e}")
            self.rollback()
            sys.exit(1)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Déploiement automatisé")
    parser.add_argument("--service", required=True, help="Nom du service")
    parser.add_argument("--env", required=True, choices=["staging", "production"])
    parser.add_argument("--version", required=True, help="Version/tag de l'image")

    args = parser.parse_args()

    deployer = Deployer(args.service, args.env, args.version)
    deployer.run()

Script de health check

#!/usr/bin/env python3
"""
Health check multi-services avec alerting.
"""

import requests
import json
import time
import logging
from datetime import datetime
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("healthcheck")

# Configuration des services à vérifier
SERVICES = [
    {"name": "API Gateway", "url": "https://api.example.com/health", "timeout": 5},
    {"name": "Frontend", "url": "https://www.example.com", "timeout": 10},
    {"name": "Auth Service", "url": "https://auth.example.com/health", "timeout": 5},
    {"name": "Prometheus", "url": "http://monitoring.internal:9090/-/healthy", "timeout": 5},
    {"name": "Grafana", "url": "http://monitoring.internal:3000/api/health", "timeout": 5},
]


def check_http(name, url, timeout=5, expected_status=200):
    """Vérifie un endpoint HTTP."""
    start = time.time()
    try:
        response = requests.get(url, timeout=timeout, allow_redirects=True)
        elapsed = (time.time() - start) * 1000  # en ms

        return {
            "name": name,
            "url": url,
            "status": "healthy" if response.status_code == expected_status else "degraded",
            "http_code": response.status_code,
            "response_time_ms": round(elapsed, 2),
            "timestamp": datetime.now().isoformat()
        }
    except requests.ConnectionError:
        return {"name": name, "url": url, "status": "down", "error": "Connection refused",
                "timestamp": datetime.now().isoformat()}
    except requests.Timeout:
        return {"name": name, "url": url, "status": "timeout", "error": f"Timeout ({timeout}s)",
                "timestamp": datetime.now().isoformat()}
    except Exception as e:
        return {"name": name, "url": url, "status": "error", "error": str(e),
                "timestamp": datetime.now().isoformat()}


def send_alert(service_result):
    """Envoie une alerte (webhook Slack, email, etc.)."""
    # Exemple avec un webhook Slack
    webhook_url = "https://hooks.slack.com/services/xxx/yyy/zzz"
    payload = {
        "text": f"🚨 *{service_result['name']}* is *{service_result['status']}*\n"
                f"URL: {service_result['url']}\n"
                f"Error: {service_result.get('error', 'N/A')}"
    }
    try:
        requests.post(webhook_url, json=payload, timeout=5)
    except Exception:
        logger.error("Impossible d'envoyer l'alerte Slack")


def run_health_checks():
    """Exécute tous les health checks."""
    results = []

    for svc in SERVICES:
        result = check_http(svc["name"], svc["url"], svc.get("timeout", 5))
        results.append(result)

        # Log
        if result["status"] == "healthy":
            logger.info(f"✅ {result['name']}: {result['status']} ({result.get('response_time_ms', '?')}ms)")
        else:
            logger.warning(f"❌ {result['name']}: {result['status']}{result.get('error', '')}")
            send_alert(result)

    # Sauvegarder le rapport
    report_path = Path("/tmp/health_report.json")
    report_path.write_text(json.dumps(results, indent=2))

    # Résumé
    healthy = sum(1 for r in results if r["status"] == "healthy")
    total = len(results)
    logger.info(f"\n📊 Résultat : {healthy}/{total} services healthy")

    return results


if __name__ == "__main__":
    run_health_checks()

Script de parsing de logs

#!/usr/bin/env python3
"""
Analyseur de logs Nginx — extrait les métriques clés.
"""

import re
import json
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path

# Pattern pour les logs Nginx au format combined
NGINX_PATTERN = re.compile(
    r'(?P<ip>\S+) - \S+ \[(?P<time>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) \S+" '
    r'(?P<status>\d{3}) (?P<size>\d+) '
    r'"(?P<referrer>[^"]*)" "(?P<user_agent>[^"]*)"'
)


def parse_nginx_log(log_path, limit=None):
    """Parse un fichier de log Nginx et retourne des statistiques."""
    stats = {
        "total_requests": 0,
        "status_codes": Counter(),
        "methods": Counter(),
        "top_paths": Counter(),
        "top_ips": Counter(),
        "errors": [],
        "total_bytes": 0,
        "response_codes_by_path": defaultdict(Counter),
    }

    with open(log_path, "r") as f:
        for i, line in enumerate(f):
            if limit and i >= limit:
                break

            match = NGINX_PATTERN.match(line.strip())
            if not match:
                continue

            data = match.groupdict()
            stats["total_requests"] += 1
            stats["status_codes"][data["status"]] += 1
            stats["methods"][data["method"]] += 1
            stats["top_paths"][data["path"]] += 1
            stats["top_ips"][data["ip"]] += 1
            stats["total_bytes"] += int(data["size"])
            stats["response_codes_by_path"][data["path"]][data["status"]] += 1

            # Collecter les erreurs 5xx
            if data["status"].startswith("5"):
                stats["errors"].append({
                    "ip": data["ip"],
                    "path": data["path"],
                    "status": data["status"],
                    "time": data["time"],
                    "user_agent": data["user_agent"]
                })

    return stats


def print_report(stats):
    """Affiche un rapport lisible."""
    print("=" * 60)
    print("📊 RAPPORT D'ANALYSE DES LOGS NGINX")
    print("=" * 60)

    print(f"\n📈 Total de requêtes : {stats['total_requests']}")
    print(f"📦 Données transférées : {stats['total_bytes'] / 1024 / 1024:.2f} MB")

    print("\n📋 Codes de statut :")
    for code, count in stats["status_codes"].most_common():
        pct = count / stats["total_requests"] * 100
        bar = "" * int(pct / 2)
        print(f"  {code}: {count:>6} ({pct:5.1f}%) {bar}")

    print("\n🔝 Top 10 des pages les plus visitées :")
    for path, count in stats["top_paths"].most_common(10):
        print(f"  {count:>6}{path}")

    print("\n🌐 Top 10 des IPs :")
    for ip, count in stats["top_ips"].most_common(10):
        print(f"  {count:>6}{ip}")

    print(f"\n🔴 Erreurs 5xx : {len(stats['errors'])}")
    for err in stats["errors"][:5]:
        print(f"  [{err['time']}] {err['status']} {err['path']}{err['ip']}")

    print("=" * 60)


if __name__ == "__main__":
    import sys

    log_file = sys.argv[1] if len(sys.argv) > 1 else "/var/log/nginx/access.log"

    if not Path(log_file).exists():
        print(f"❌ Fichier non trouvé : {log_file}")
        sys.exit(1)

    stats = parse_nginx_log(log_file)
    print_report(stats)

    # Export JSON
    export = {k: v for k, v in stats.items() if k != "response_codes_by_path"}
    export["status_codes"] = dict(export["status_codes"])
    export["methods"] = dict(export["methods"])
    export["top_paths"] = dict(export["top_paths"].most_common(20))
    export["top_ips"] = dict(export["top_ips"].most_common(20))

    output = Path("/tmp/nginx_analysis.json")
    output.write_text(json.dumps(export, indent=2, default=str))
    print(f"\n💾 Rapport exporté dans {output}")

Bonus : monitoring système rapide

#!/usr/bin/env python3
"""
Collecte des métriques système basiques (sans dépendance externe).
"""

import os
import subprocess
from pathlib import Path


def get_cpu_usage():
    """Retourne l'utilisation CPU moyenne sur 1 seconde."""
    with open("/proc/stat") as f:
        line1 = f.readline().split()
    import time
    time.sleep(1)
    with open("/proc/stat") as f:
        line2 = f.readline().split()

    idle1 = int(line1[4])
    idle2 = int(line2[4])
    total1 = sum(int(x) for x in line1[1:])
    total2 = sum(int(x) for x in line2[1:])

    idle_delta = idle2 - idle1
    total_delta = total2 - total1

    return round((1 - idle_delta / total_delta) * 100, 1)


def get_memory_usage():
    """Retourne l'utilisation mémoire."""
    with open("/proc/meminfo") as f:
        lines = f.readlines()

    mem = {}
    for line in lines:
        parts = line.split()
        mem[parts[0].rstrip(":")] = int(parts[1])

    total = mem["MemTotal"]
    available = mem["MemAvailable"]
    used = total - available

    return {
        "total_mb": round(total / 1024, 0),
        "used_mb": round(used / 1024, 0),
        "available_mb": round(available / 1024, 0),
        "percent": round(used / total * 100, 1)
    }


def get_disk_usage(mount="/"):
    """Retourne l'utilisation disque."""
    stat = os.statvfs(mount)
    total = stat.f_blocks * stat.f_frsize
    free = stat.f_bfree * stat.f_frsize
    used = total - free

    return {
        "total_gb": round(total / 1024**3, 1),
        "used_gb": round(used / 1024**3, 1),
        "free_gb": round(free / 1024**3, 1),
        "percent": round(used / total * 100, 1)
    }


if __name__ == "__main__":
    print("🖥️  Métriques système")
    print(f"  CPU : {get_cpu_usage()}%")

    mem = get_memory_usage()
    print(f"  RAM : {mem['used_mb']}/{mem['total_mb']} MB ({mem['percent']}%)")

    disk = get_disk_usage("/")
    print(f"  Disk: {disk['used_gb']}/{disk['total_gb']} GB ({disk['percent']}%)")

À toi de jouer

Exercice 1 — Parsing de logs

Écris un script Python qui parse un fichier de logs nginx, extrait les codes HTTP avec une regex, et affiche un résumé : nombre de 200, 404, 500, etc. Utilise collections.Counter pour le comptage.

Exercice 2 — Health check multi-services

Crée un script qui vérifie la santé de 5 URLs (avec requests.get et un timeout de 5s). Affiche un tableau avec le statut, le temps de réponse, et le code HTTP. Gère les timeouts et erreurs de connexion proprement.

Exercice 3 — Défi bonus : monitoring système avec alertes

Écris un script qui collecte l’usage CPU (psutil.cpu_percent), RAM (psutil.virtual_memory), et disque (psutil.disk_usage). Si un seuil dépasse 80%, écris une alerte dans un fichier JSON horodaté. Configure-le en cron toutes les minutes.


Conclusion

Tu as maintenant les outils pour écrire de vrais scripts d’automatisation DevOps en Python :

  • subprocess — exécuter des commandes système, capturer la sortie, gérer les erreurs
  • os/sys/pathlib — interaction avec le système de fichiers et l’OS
  • requests — appels HTTP, interaction avec des APIs REST
  • paramiko — SSH et SFTP en Python
  • Scripts concrets — déploiement, health check, parsing de logs, monitoring

Dans le prochain et dernier chapitre, on passe aux SDK cloud : Boto3, Google Cloud, Azure, Docker SDK, et l’intégration avec Terraform et Ansible.

📝 Résumé

  • pathlib est la façon moderne de manipuler les chemins de fichiers (pas os.path !)
  • subprocess.run() exécute des commandes système avec capture de sortie et gestion d’erreurs
  • requests (ou httpx) pour les appels API REST — GET, POST, authentification
  • json et yaml pour parser/générer les formats de config DevOps
  • argparse ou click transforment un script en CLI professionnel
  • logging > print() — toujours, surtout en production

⚠️ Attention : N’utilise JAMAIS os.system() ou shell=True avec des entrées utilisateur — c’est une injection de commande en puissance. Préfère subprocess.run() avec une liste d’arguments.

➡️ La suite

Prochain chapitre : les SDK cloud en Python — boto3 (AWS), google-cloud, azure-sdk pour piloter ton infrastructure directement depuis Python.

Articles liés