Aller au contenu principal
DataETLDevOpsBackup

Pipelines ETL : Airflow, dbt et Orchestration de Données

30 min de lecture Gestion des Données — Chapitre 2

Construis des pipelines ETL robustes avec Apache Airflow et dbt. DAGs, transformations, monitoring et bonnes pratiques pour des flux de données fiables.

🎯 Objectif : Construire des pipelines de données robustes avec Airflow et dbt, comprendre les patterns ETL/ELT et savoir monitorer tes flux en production. ⏱️ Durée estimée : 60 minutes | Niveau : Avancé

Pourquoi les pipelines ETL sont au cœur du DevOps moderne

Tes données sont propres et bien formatées (cours précédent). Maintenant il faut les déplacer, les transformer et les charger — de façon automatisée, fiable et observable. C’est le rôle de l’ETL : Extract, Transform, Load.

En entreprise, les pipelines de données sont partout : alimentation des dashboards analytiques, synchronisation entre systèmes, calcul de métriques métier, préparation des modèles ML. Un pipeline bien conçu tourne silencieusement en arrière-plan. Un pipeline mal conçu te réveille à 3h du matin.

🔥 Cas réel : Une entreprise e-commerce alimentait son tableau de bord des ventes avec un script Python lancé manuellement par un développeur chaque lundi. Quand le développeur est parti en vacances, personne n’a relancé le script pendant 3 semaines. Les décisions stratégiques étaient basées sur des données obsolètes. L’implémentation d’un DAG Airflow avec alertes Slack a résolu le problème — le pipeline tourne tous les jours à 6h, et l’équipe reçoit une notification si quelque chose échoue.

Ce cours couvre les deux outils de référence : Apache Airflow pour l’orchestration et dbt pour les transformations SQL versionnées.

ETL vs ELT : deux philosophies, un même objectif

Avant de plonger dans les outils, il faut comprendre les deux approches dominantes.

ETL classique : les données sont extraites, transformées avant le chargement, puis chargées dans la destination. C’est l’approche historique, adaptée quand la puissance de calcul du warehouse est limitée ou quand tu veux filtrer les données sensibles avant stockage.

ELT moderne : les données sont extraites, chargées brutes dans le warehouse, puis transformées sur place. C’est devenu la norme avec les warehouses cloud (BigQuery, Snowflake, Redshift) qui ont la puissance de calcul nécessaire.

💡 Tip DevOps : L’ELT est presque toujours le meilleur choix avec un warehouse moderne. Les avantages sont décisifs : les données brutes restent disponibles pour reprocessing, les transformations sont versionnées dans Git (avec dbt), et le debug est plus facile parce que tu peux inspecter les données à chaque étape. L’ETL classique reste pertinent quand tu dois filtrer des données sensibles (RGPD) avant de les stocker, ou quand ta destination n’a pas la puissance de calcul suffisante.

Apache Airflow : orchestrer les workflows

Airflow ne déplace pas les données lui-même — il orchestre quand et dans quel ordre les tâches s’exécutent. Pense à Airflow comme un chef d’orchestre : il ne joue d’aucun instrument, mais il dirige l’ensemble.

Les concepts fondamentaux : un DAG (Directed Acyclic Graph) est un workflow complet. Chaque Task est une unité de travail. Un Operator définit le type de tâche (Python, Bash, SQL…). Un Sensor attend une condition externe. Les XComs permettent la communication entre tâches.

Le déploiement le plus courant utilise Docker Compose avec un PostgreSQL pour les métadonnées, un webserver pour l’interface, et un scheduler pour l’exécution :

# docker-compose.yml — Airflow minimal production-ready
version: '3.8'
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
  airflow-webserver:
    image: apache/airflow:2.9.0
    command: airflow webserver --port 8080
    ports: ["8080:8080"]
    environment: &airflow-env
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
  airflow-scheduler:
    image: apache/airflow:2.9.0
    command: airflow scheduler
    environment: *airflow-env
    volumes:
      - ./dags:/opt/airflow/dags

⚠️ Attention : En production, ne lance jamais Airflow avec le SequentialExecutor par défaut — il n’exécute qu’une tâche à la fois. Utilise LocalExecutor pour un seul serveur, ou CeleryExecutor / KubernetesExecutor pour scaler sur plusieurs workers. Et n’oublie pas l’étape airflow db init + création d’un utilisateur admin avant le premier lancement.

Ton premier DAG : un pipeline ETL complet

Voici un DAG réaliste qui extrait des données d’un CSV, les transforme et les charge — avec gestion d’erreurs, retries et notification :

# dags/etl_user_metrics.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1),
    'email_on_failure': True,
    'email': ['alerts@lab.dev'],
}

with DAG(
    dag_id='etl_user_metrics',
    default_args=default_args,
    schedule_interval='0 6 * * *',  # Tous les jours à 6h
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['etl', 'production'],
) as dag:

    def extract(**context):
        import pandas as pd
        ds = context['ds']  # Date d'exécution (YYYY-MM-DD)
        df = pd.read_csv(f'/data/exports/users_{ds}.csv')
        assert len(df) > 0, "Fichier vide !"
        context['ti'].xcom_push(key='row_count', value=len(df))

    def transform(**context):
        import pandas as pd
        ds = context['ds']
        df = pd.read_csv(f'/data/exports/users_{ds}.csv')
        df['email'] = df['email'].str.lower().str.strip()
        df = df.dropna(subset=['user_id', 'email']).drop_duplicates('user_id')
        df.to_parquet(f'/data/staging/users_{ds}.parquet', index=False)

    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = BashOperator(
        task_id='load',
        bash_command='python /scripts/load_parquet.py --file /data/staging/users_{{ ds }}.parquet',
    )
    notify = BashOperator(
        task_id='notify',
        bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text":"✅ ETL terminé pour {{ ds }}"}\'',
    )

    extract_task >> transform_task >> load_task >> notify

🧠 À retenir : Le {{ ds }} dans les templates Airflow est la date d’exécution logique, pas la date actuelle. Pour un DAG schedulé à 0 6 * * *, le run du 21 mars à 6h aura ds = "2026-03-21". C’est ce qui permet le backfill — retraiter des jours passés. Conçois toujours tes DAGs en fonction de ds, jamais de datetime.now().

Patterns avancés : branchement et TaskGroups

Les DAGs réels dépassent le simple pipeline linéaire. Le BranchPythonOperator permet de choisir dynamiquement le chemin d’exécution. Les TaskGroups organisent visuellement les tâches complexes :

from airflow.operators.python import BranchPythonOperator
from airflow.utils.task_group import TaskGroup

def choose_branch(**context):
    row_count = context['ti'].xcom_pull(task_ids='extract', key='row_count')
    return 'heavy_processing' if row_count > 10000 else 'light_processing'

branch = BranchPythonOperator(task_id='branch_on_size', python_callable=choose_branch)

# TaskGroup pour regrouper les checks de qualité
with TaskGroup('quality_checks') as quality:
    check_nulls = PythonOperator(task_id='check_nulls', python_callable=...)
    check_dupes = PythonOperator(task_id='check_dupes', python_callable=...)

dbt : les transformations SQL comme du code

dbt (data build tool) est la réponse à un problème classique : des requêtes SQL de transformation éparpillées dans des scripts, sans versioning, sans tests, sans documentation. dbt applique les pratiques du software engineering aux transformations SQL.

Le principe : tu écris des modèles SQL (des SELECT), dbt se charge du CREATE TABLE / CREATE VIEW, de la gestion des dépendances entre modèles, des tests et de la documentation auto-générée.

La structure d’un projet dbt suit trois couches : staging (nettoyage des données brutes), intermediate (logique métier intermédiaire), marts (métriques finales exposées aux consommateurs).

-- models/staging/stg_users.sql — nettoyage brut
{{ config(materialized='view') }}

select
    id as user_id,
    lower(trim(email)) as email,
    coalesce(name, 'Unknown') as name,
    created_at::timestamp as created_at
from {{ source('raw', 'users') }}
where id is not null and email is not null
-- models/marts/fct_user_activity.sql — métriques finales
{{ config(materialized='incremental', unique_key='user_id') }}

select
    u.user_id, u.email,
    count(distinct s.session_id) as total_sessions,
    max(s.session_end) as last_active_at
from {{ ref('stg_users') }} u
left join {{ ref('int_user_sessions') }} s on u.user_id = s.user_id
{% if is_incremental() %}
where s.session_start > (select max(last_active_at) from {{ this }})
{% endif %}
group by u.user_id, u.email

💡 Tip DevOps : Les trois matérialisations dbt à connaître : view (staging — léger, toujours à jour), table (intermediate — données précalculées, rebuild complet), incremental (marts — ne traite que les nouvelles données, essentiel pour les gros volumes). Le {{ ref('model_name') }} crée automatiquement les dépendances entre modèles — dbt sait dans quel ordre les exécuter.

Les tests dbt sont déclaratifs et puissants. Tu les définis en YAML (tests génériques) ou en SQL (tests custom) :

# models/staging/schema.yml
models:
  - name: stg_users
    columns:
      - name: user_id
        tests: [unique, not_null]
      - name: email
        tests: [unique, not_null]
dbt run                    # Build tous les modèles
dbt test                   # Lance tous les tests
dbt docs generate && dbt docs serve  # Documentation auto-générée

Bonnes pratiques et pièges des pipelines

L’idempotence est non-négociable. Un pipeline relancé deux fois doit produire le même résultat. Utilise des UPSERT / MERGE au lieu de simples INSERT, et conçois tes DAGs Airflow autour de la date logique ds :

-- UPSERT PostgreSQL — idempotent par design
INSERT INTO metrics (user_id, date, value)
VALUES ($1, $2, $3)
ON CONFLICT (user_id, date)
DO UPDATE SET value = EXCLUDED.value;

Le monitoring sépare un pipeline jouet d’un pipeline production. Chaque pipeline doit exposer trois métriques minimum : le timestamp du dernier succès, la durée d’exécution, et le nombre de lignes traitées. Alerte si le pipeline n’a pas tourné depuis 24h ou si la durée dépasse le seuil normal.

Les dead letter queues sauvent les données. Les enregistrements qui échouent au processing ne doivent jamais être silencieusement perdus — redirige-les vers une file d’erreur pour analyse et reprocessing :

def process_with_dlq(records):
    success, failed = [], []
    for record in records:
        try:
            success.append(transform(record))
        except Exception as e:
            record['_error'] = str(e)
            failed.append(record)
    if failed:
        write_to_dlq(failed)  # Stocke pour reprocessing
    return success

⚠️ Attention : Les trois pièges les plus courants en pipelines ETL. 1) Dépendre de datetime.now() au lieu de la date logique — impossible de faire un backfill propre. 2) Pas de tests sur les données — tu découvres les problèmes quand le dashboard affiche des absurdités. 3) Pas de retry ni d’alerting — le pipeline échoue silencieusement et personne ne s’en rend compte pendant des jours.

🔥 Cas réel : Une équipe data avait un pipeline Airflow qui échouait 2-3 fois par semaine à cause de timeouts réseau sur l’API source. Ils corrigeaient manuellement à chaque fois. L’ajout de retries: 3 avec retry_delay: timedelta(minutes=5) dans les default_args a résolu 95% des échecs. Les 5% restants déclenchent maintenant une alerte Slack au lieu de passer inaperçus.

Résumé

Un pipeline ETL robuste repose sur trois piliers : l’orchestration (Airflow gère le quand, l’ordre et les retries), les transformations versionnées (dbt structure le SQL avec tests et documentation), et l’observabilité (métriques, alertes, dead letter queues).

Conçois toujours tes pipelines idempotents (relançables sans effet de bord), paramétrés par date logique (backfill possible), et observables (tu sais en temps réel ce qui tourne, ce qui échoue, et pourquoi). Ces trois principes s’appliquent que tu utilises Airflow, Prefect, Dagster, ou un simple cron — les outils changent, les principes restent.


À toi de jouer

Exercice 1 — DAG Airflow simple Crée un DAG avec 3 tasks : extract (lire un CSV), transform (filtrer et nettoyer avec Python), load (écrire en JSON). Configure les dépendances avec >>, ajoute retries: 2 et une notification en cas d’échec.

Exercice 2 — Pipeline dbt Initialise un projet dbt (dbt init). Crée un modèle staging qui nettoie des données de logs (lowercase, trim, drop nulls), et un modèle mart qui agrège par jour. Ajoute des tests not_null et unique sur la date. Lance avec dbt run && dbt test.

Exercice 3 — Monitoring de pipeline Écris un script Python qui simule un pipeline ETL : 1000 records, 5% de taux d’erreur aléatoire, dead letter queue pour les échecs. Mesure la durée de chaque étape et le nombre de lignes traitées. Écris un rapport JSON avec les métriques et un check qui alerte si le taux d’erreur dépasse le seuil.

Articles liés