🎯 Objectif : Construire des pipelines de données robustes avec Airflow et dbt, comprendre les patterns ETL/ELT et savoir monitorer tes flux en production. ⏱️ Durée estimée : 60 minutes | Niveau : Avancé
Pourquoi les pipelines ETL sont au cœur du DevOps moderne
Tes données sont propres et bien formatées (cours précédent). Maintenant il faut les déplacer, les transformer et les charger — de façon automatisée, fiable et observable. C’est le rôle de l’ETL : Extract, Transform, Load.
En entreprise, les pipelines de données sont partout : alimentation des dashboards analytiques, synchronisation entre systèmes, calcul de métriques métier, préparation des modèles ML. Un pipeline bien conçu tourne silencieusement en arrière-plan. Un pipeline mal conçu te réveille à 3h du matin.
🔥 Cas réel : Une entreprise e-commerce alimentait son tableau de bord des ventes avec un script Python lancé manuellement par un développeur chaque lundi. Quand le développeur est parti en vacances, personne n’a relancé le script pendant 3 semaines. Les décisions stratégiques étaient basées sur des données obsolètes. L’implémentation d’un DAG Airflow avec alertes Slack a résolu le problème — le pipeline tourne tous les jours à 6h, et l’équipe reçoit une notification si quelque chose échoue.
Ce cours couvre les deux outils de référence : Apache Airflow pour l’orchestration et dbt pour les transformations SQL versionnées.
ETL vs ELT : deux philosophies, un même objectif
Avant de plonger dans les outils, il faut comprendre les deux approches dominantes.
ETL classique : les données sont extraites, transformées avant le chargement, puis chargées dans la destination. C’est l’approche historique, adaptée quand la puissance de calcul du warehouse est limitée ou quand tu veux filtrer les données sensibles avant stockage.
ELT moderne : les données sont extraites, chargées brutes dans le warehouse, puis transformées sur place. C’est devenu la norme avec les warehouses cloud (BigQuery, Snowflake, Redshift) qui ont la puissance de calcul nécessaire.
💡 Tip DevOps : L’ELT est presque toujours le meilleur choix avec un warehouse moderne. Les avantages sont décisifs : les données brutes restent disponibles pour reprocessing, les transformations sont versionnées dans Git (avec dbt), et le debug est plus facile parce que tu peux inspecter les données à chaque étape. L’ETL classique reste pertinent quand tu dois filtrer des données sensibles (RGPD) avant de les stocker, ou quand ta destination n’a pas la puissance de calcul suffisante.
Apache Airflow : orchestrer les workflows
Airflow ne déplace pas les données lui-même — il orchestre quand et dans quel ordre les tâches s’exécutent. Pense à Airflow comme un chef d’orchestre : il ne joue d’aucun instrument, mais il dirige l’ensemble.
Les concepts fondamentaux : un DAG (Directed Acyclic Graph) est un workflow complet. Chaque Task est une unité de travail. Un Operator définit le type de tâche (Python, Bash, SQL…). Un Sensor attend une condition externe. Les XComs permettent la communication entre tâches.
Le déploiement le plus courant utilise Docker Compose avec un PostgreSQL pour les métadonnées, un webserver pour l’interface, et un scheduler pour l’exécution :
# docker-compose.yml — Airflow minimal production-ready
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
image: apache/airflow:2.9.0
command: airflow webserver --port 8080
ports: ["8080:8080"]
environment: &airflow-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
airflow-scheduler:
image: apache/airflow:2.9.0
command: airflow scheduler
environment: *airflow-env
volumes:
- ./dags:/opt/airflow/dags
⚠️ Attention : En production, ne lance jamais Airflow avec le SequentialExecutor par défaut — il n’exécute qu’une tâche à la fois. Utilise LocalExecutor pour un seul serveur, ou CeleryExecutor / KubernetesExecutor pour scaler sur plusieurs workers. Et n’oublie pas l’étape airflow db init + création d’un utilisateur admin avant le premier lancement.
Ton premier DAG : un pipeline ETL complet
Voici un DAG réaliste qui extrait des données d’un CSV, les transforme et les charge — avec gestion d’erreurs, retries et notification :
# dags/etl_user_metrics.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1),
'email_on_failure': True,
'email': ['alerts@lab.dev'],
}
with DAG(
dag_id='etl_user_metrics',
default_args=default_args,
schedule_interval='0 6 * * *', # Tous les jours à 6h
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['etl', 'production'],
) as dag:
def extract(**context):
import pandas as pd
ds = context['ds'] # Date d'exécution (YYYY-MM-DD)
df = pd.read_csv(f'/data/exports/users_{ds}.csv')
assert len(df) > 0, "Fichier vide !"
context['ti'].xcom_push(key='row_count', value=len(df))
def transform(**context):
import pandas as pd
ds = context['ds']
df = pd.read_csv(f'/data/exports/users_{ds}.csv')
df['email'] = df['email'].str.lower().str.strip()
df = df.dropna(subset=['user_id', 'email']).drop_duplicates('user_id')
df.to_parquet(f'/data/staging/users_{ds}.parquet', index=False)
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = BashOperator(
task_id='load',
bash_command='python /scripts/load_parquet.py --file /data/staging/users_{{ ds }}.parquet',
)
notify = BashOperator(
task_id='notify',
bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text":"✅ ETL terminé pour {{ ds }}"}\'',
)
extract_task >> transform_task >> load_task >> notify
🧠 À retenir : Le {{ ds }} dans les templates Airflow est la date d’exécution logique, pas la date actuelle. Pour un DAG schedulé à 0 6 * * *, le run du 21 mars à 6h aura ds = "2026-03-21". C’est ce qui permet le backfill — retraiter des jours passés. Conçois toujours tes DAGs en fonction de ds, jamais de datetime.now().
Patterns avancés : branchement et TaskGroups
Les DAGs réels dépassent le simple pipeline linéaire. Le BranchPythonOperator permet de choisir dynamiquement le chemin d’exécution. Les TaskGroups organisent visuellement les tâches complexes :
from airflow.operators.python import BranchPythonOperator
from airflow.utils.task_group import TaskGroup
def choose_branch(**context):
row_count = context['ti'].xcom_pull(task_ids='extract', key='row_count')
return 'heavy_processing' if row_count > 10000 else 'light_processing'
branch = BranchPythonOperator(task_id='branch_on_size', python_callable=choose_branch)
# TaskGroup pour regrouper les checks de qualité
with TaskGroup('quality_checks') as quality:
check_nulls = PythonOperator(task_id='check_nulls', python_callable=...)
check_dupes = PythonOperator(task_id='check_dupes', python_callable=...)
dbt : les transformations SQL comme du code
dbt (data build tool) est la réponse à un problème classique : des requêtes SQL de transformation éparpillées dans des scripts, sans versioning, sans tests, sans documentation. dbt applique les pratiques du software engineering aux transformations SQL.
Le principe : tu écris des modèles SQL (des SELECT), dbt se charge du CREATE TABLE / CREATE VIEW, de la gestion des dépendances entre modèles, des tests et de la documentation auto-générée.
La structure d’un projet dbt suit trois couches : staging (nettoyage des données brutes), intermediate (logique métier intermédiaire), marts (métriques finales exposées aux consommateurs).
-- models/staging/stg_users.sql — nettoyage brut
{{ config(materialized='view') }}
select
id as user_id,
lower(trim(email)) as email,
coalesce(name, 'Unknown') as name,
created_at::timestamp as created_at
from {{ source('raw', 'users') }}
where id is not null and email is not null
-- models/marts/fct_user_activity.sql — métriques finales
{{ config(materialized='incremental', unique_key='user_id') }}
select
u.user_id, u.email,
count(distinct s.session_id) as total_sessions,
max(s.session_end) as last_active_at
from {{ ref('stg_users') }} u
left join {{ ref('int_user_sessions') }} s on u.user_id = s.user_id
{% if is_incremental() %}
where s.session_start > (select max(last_active_at) from {{ this }})
{% endif %}
group by u.user_id, u.email
💡 Tip DevOps : Les trois matérialisations dbt à connaître : view (staging — léger, toujours à jour), table (intermediate — données précalculées, rebuild complet), incremental (marts — ne traite que les nouvelles données, essentiel pour les gros volumes). Le {{ ref('model_name') }} crée automatiquement les dépendances entre modèles — dbt sait dans quel ordre les exécuter.
Les tests dbt sont déclaratifs et puissants. Tu les définis en YAML (tests génériques) ou en SQL (tests custom) :
# models/staging/schema.yml
models:
- name: stg_users
columns:
- name: user_id
tests: [unique, not_null]
- name: email
tests: [unique, not_null]
dbt run # Build tous les modèles
dbt test # Lance tous les tests
dbt docs generate && dbt docs serve # Documentation auto-générée
Bonnes pratiques et pièges des pipelines
L’idempotence est non-négociable. Un pipeline relancé deux fois doit produire le même résultat. Utilise des UPSERT / MERGE au lieu de simples INSERT, et conçois tes DAGs Airflow autour de la date logique ds :
-- UPSERT PostgreSQL — idempotent par design
INSERT INTO metrics (user_id, date, value)
VALUES ($1, $2, $3)
ON CONFLICT (user_id, date)
DO UPDATE SET value = EXCLUDED.value;
Le monitoring sépare un pipeline jouet d’un pipeline production. Chaque pipeline doit exposer trois métriques minimum : le timestamp du dernier succès, la durée d’exécution, et le nombre de lignes traitées. Alerte si le pipeline n’a pas tourné depuis 24h ou si la durée dépasse le seuil normal.
Les dead letter queues sauvent les données. Les enregistrements qui échouent au processing ne doivent jamais être silencieusement perdus — redirige-les vers une file d’erreur pour analyse et reprocessing :
def process_with_dlq(records):
success, failed = [], []
for record in records:
try:
success.append(transform(record))
except Exception as e:
record['_error'] = str(e)
failed.append(record)
if failed:
write_to_dlq(failed) # Stocke pour reprocessing
return success
⚠️ Attention : Les trois pièges les plus courants en pipelines ETL. 1) Dépendre de datetime.now() au lieu de la date logique — impossible de faire un backfill propre. 2) Pas de tests sur les données — tu découvres les problèmes quand le dashboard affiche des absurdités. 3) Pas de retry ni d’alerting — le pipeline échoue silencieusement et personne ne s’en rend compte pendant des jours.
🔥 Cas réel : Une équipe data avait un pipeline Airflow qui échouait 2-3 fois par semaine à cause de timeouts réseau sur l’API source. Ils corrigeaient manuellement à chaque fois. L’ajout de retries: 3 avec retry_delay: timedelta(minutes=5) dans les default_args a résolu 95% des échecs. Les 5% restants déclenchent maintenant une alerte Slack au lieu de passer inaperçus.
Résumé
Un pipeline ETL robuste repose sur trois piliers : l’orchestration (Airflow gère le quand, l’ordre et les retries), les transformations versionnées (dbt structure le SQL avec tests et documentation), et l’observabilité (métriques, alertes, dead letter queues).
Conçois toujours tes pipelines idempotents (relançables sans effet de bord), paramétrés par date logique (backfill possible), et observables (tu sais en temps réel ce qui tourne, ce qui échoue, et pourquoi). Ces trois principes s’appliquent que tu utilises Airflow, Prefect, Dagster, ou un simple cron — les outils changent, les principes restent.
À toi de jouer
Exercice 1 — DAG Airflow simple
Crée un DAG avec 3 tasks : extract (lire un CSV), transform (filtrer et nettoyer avec Python), load (écrire en JSON). Configure les dépendances avec >>, ajoute retries: 2 et une notification en cas d’échec.
Exercice 2 — Pipeline dbt
Initialise un projet dbt (dbt init). Crée un modèle staging qui nettoie des données de logs (lowercase, trim, drop nulls), et un modèle mart qui agrège par jour. Ajoute des tests not_null et unique sur la date. Lance avec dbt run && dbt test.
Exercice 3 — Monitoring de pipeline Écris un script Python qui simule un pipeline ETL : 1000 records, 5% de taux d’erreur aléatoire, dead letter queue pour les échecs. Mesure la durée de chaque étape et le nombre de lignes traitées. Écris un rapport JSON avec les métriques et un check qui alerte si le taux d’erreur dépasse le seuil.
Contenu réservé aux abonnés
Ce chapitre fait partie de la formation complète. Abonne-toi pour débloquer tous les contenus.
Débloquer pour 29 CHF/moisLe chapitre 1 de chaque formation est gratuit.
Série pas encore débloquée
Termine la série prérequise d'abord pour accéder à ce contenu.
Aller à la série prérequiseSérie : Gestion des Données
2 / 4Sur cette page
Articles liés
Fondamentaux des Données : Types, Formats et Sérialisation
Maîtrise les types de données, les formats d'échange (JSON, CSV, Parquet, Avro) et les mécanismes de sérialisation pour construire des pipelines robustes.
Stratégies de Backup : Restic, BorgBackup, Velero et Disaster Recovery
Implémente des stratégies de backup solides avec la règle 3-2-1, restic, borgbackup et Velero pour Kubernetes. Prépare ton disaster recovery plan.
Migrations Zero-Downtime : Flyway, Liquibase et Blue-Green DB
Maîtrise les migrations de bases de données sans interruption de service avec Flyway, Liquibase, et les stratégies blue-green pour tes déploiements.