Le problème des mocks en intégration
Les tests unitaires mockent tout. C’est bien pour la rapidité et l’isolation, mais tu ne testes jamais la vraie interaction :
# ❌ Ce test passe, mais est-ce que la vraie requête SQL marche ?
def test_get_user(mocker):
mock_db = mocker.MagicMock()
mock_db.query.return_value.filter_by.return_value.first.return_value = User(
id=1, name="John"
)
service = UserService(mock_db)
user = service.get_by_id(1)
assert user.name == "John" # ← Le mock retourne ce que TU lui dis
Ce test ne vérifie pas :
- Que la requête SQL est correcte
- Que le mapping ORM fonctionne
- Que les types sont compatibles
- Que les contraintes de la DB sont respectées
Pour ça, tu as besoin de vrais tests d’intégration avec de vraies bases de données.
Testcontainers : des conteneurs Docker pour tes tests
testcontainers-python démarre des conteneurs Docker à la demande pour tes tests. Tu testes contre une vraie PostgreSQL, un vrai Redis, un vrai MongoDB — pas un mock.
Installation
pip install testcontainers[postgres,redis,mongodb]
Prérequis : Docker doit tourner sur la machine qui exécute les tests.
PostgreSQL avec testcontainers
# tests/integration/conftest.py
import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from myapp.models import Base
@pytest.fixture(scope="session")
def postgres_container():
"""Démarre un conteneur PostgreSQL pour toute la session de tests."""
with PostgresContainer("postgres:16") as postgres:
yield postgres
@pytest.fixture(scope="session")
def db_engine(postgres_container):
"""Crée un engine SQLAlchemy connecté au conteneur."""
engine = create_engine(postgres_container.get_connection_url())
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture
def db_session(db_engine):
"""Crée une session avec rollback après chaque test."""
connection = db_engine.connect()
transaction = connection.begin()
Session = sessionmaker(bind=connection)
session = Session()
yield session
session.close()
transaction.rollback()
connection.close()
# tests/integration/test_user_repository.py
from myapp.models import User
from myapp.repositories import UserRepository
def test_create_user(db_session):
repo = UserRepository(db_session)
user = repo.create(name="Alice", email="alice@example.com")
assert user.id is not None
assert user.name == "Alice"
# Vérifie en DB directement
saved = db_session.get(User, user.id)
assert saved.email == "alice@example.com"
def test_find_user_by_email(db_session):
repo = UserRepository(db_session)
repo.create(name="Bob", email="bob@example.com")
found = repo.find_by_email("bob@example.com")
assert found is not None
assert found.name == "Bob"
def test_find_nonexistent_user(db_session):
repo = UserRepository(db_session)
found = repo.find_by_email("ghost@example.com")
assert found is None
def test_unique_email_constraint(db_session):
repo = UserRepository(db_session)
repo.create(name="Charlie", email="charlie@example.com")
with pytest.raises(IntegrityError):
repo.create(name="Charlie 2", email="charlie@example.com")
Redis avec testcontainers
# tests/integration/conftest.py
from testcontainers.redis import RedisContainer
import redis
@pytest.fixture(scope="session")
def redis_container():
with RedisContainer("redis:7-alpine") as container:
yield container
@pytest.fixture
def redis_client(redis_container):
client = redis.Redis(
host=redis_container.get_container_host_ip(),
port=redis_container.get_exposed_port(6379),
decode_responses=True,
)
yield client
client.flushall() # Clean après chaque test
# tests/integration/test_cache.py
from myapp.cache import CacheService
def test_cache_set_and_get(redis_client):
cache = CacheService(redis_client)
cache.set("user:1", {"name": "Alice"}, ttl=60)
result = cache.get("user:1")
assert result == {"name": "Alice"}
def test_cache_expiration(redis_client):
cache = CacheService(redis_client)
cache.set("temp", "value", ttl=1)
import time
time.sleep(1.5)
result = cache.get("temp")
assert result is None
def test_cache_invalidation(redis_client):
cache = CacheService(redis_client)
cache.set("user:1", {"name": "Alice"})
cache.invalidate("user:1")
assert cache.get("user:1") is None
MongoDB avec testcontainers
from testcontainers.mongodb import MongoDbContainer
from pymongo import MongoClient
@pytest.fixture(scope="session")
def mongo_container():
with MongoDbContainer("mongo:7") as container:
yield container
@pytest.fixture
def mongo_db(mongo_container):
client = MongoClient(mongo_container.get_connection_url())
db = client["test_db"]
yield db
client.drop_database("test_db")
def test_insert_document(mongo_db):
collection = mongo_db["products"]
result = collection.insert_one({
"name": "Laptop",
"price": 999.99,
"tags": ["electronics", "computer"]
})
assert result.inserted_id is not None
found = collection.find_one({"name": "Laptop"})
assert found["price"] == 999.99
Conteneurs custom
Pour un service qui n’a pas de module testcontainers dédié :
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
@pytest.fixture(scope="session")
def elasticsearch():
container = (
DockerContainer("elasticsearch:8.12.0")
.with_exposed_ports(9200)
.with_env("discovery.type", "single-node")
.with_env("xpack.security.enabled", "false")
.with_env("ES_JAVA_OPTS", "-Xms512m -Xmx512m")
)
container.start()
wait_for_logs(container, "started", timeout=60)
host = container.get_container_host_ip()
port = container.get_exposed_port(9200)
yield f"http://{host}:{port}"
container.stop()
API Testing : tester tes endpoints
Avec le TestClient de FastAPI
# tests/integration/conftest.py
import pytest
from fastapi.testclient import TestClient
from myapp.main import app
@pytest.fixture
def client():
"""Client de test pour FastAPI."""
with TestClient(app) as client:
yield client
# tests/integration/test_api.py
def test_health_check(client):
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "healthy"}
def test_create_user(client):
response = client.post("/users", json={
"name": "Alice",
"email": "alice@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 201
data = response.json()
assert data["name"] == "Alice"
assert data["email"] == "alice@example.com"
assert "password" not in data # Le password ne doit pas être retourné
assert "id" in data
def test_create_user_duplicate_email(client):
# Premier user
client.post("/users", json={
"name": "Alice",
"email": "alice@example.com",
"password": "Pass123!"
})
# Duplicate
response = client.post("/users", json={
"name": "Alice 2",
"email": "alice@example.com",
"password": "Pass456!"
})
assert response.status_code == 409
assert "already exists" in response.json()["detail"]
def test_create_user_invalid_email(client):
response = client.post("/users", json={
"name": "Bad",
"email": "not-an-email",
"password": "Pass123!"
})
assert response.status_code == 422 # Validation error
def test_get_user(client):
# Create
create_response = client.post("/users", json={
"name": "Bob",
"email": "bob@example.com",
"password": "Pass123!"
})
user_id = create_response.json()["id"]
# Get
response = client.get(f"/users/{user_id}")
assert response.status_code == 200
assert response.json()["name"] == "Bob"
def test_get_nonexistent_user(client):
response = client.get("/users/99999")
assert response.status_code == 404
def test_list_users_pagination(client):
# Créer 15 users
for i in range(15):
client.post("/users", json={
"name": f"User {i}",
"email": f"user{i}@example.com",
"password": "Pass123!"
})
# Page 1
response = client.get("/users?page=1&per_page=10")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 10
assert data["total"] == 15
assert data["page"] == 1
# Page 2
response = client.get("/users?page=2&per_page=10")
assert len(response.json()["items"]) == 5
Avec httpx (pour n’importe quelle API)
import httpx
import pytest
@pytest.fixture(scope="session")
def api_url():
"""URL de l'API de test (peut être un environnement de staging)."""
return "http://localhost:8000"
@pytest.fixture
def api_client(api_url):
with httpx.Client(base_url=api_url, timeout=10) as client:
yield client
def test_api_response_time(api_client):
response = api_client.get("/health")
assert response.status_code == 200
assert response.elapsed.total_seconds() < 0.5 # < 500ms
Tester l’authentification
@pytest.fixture
def auth_headers(client):
"""Obtient un token d'authentification."""
response = client.post("/auth/login", json={
"email": "admin@example.com",
"password": "AdminPass123!"
})
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
def test_protected_endpoint_without_auth(client):
response = client.get("/admin/users")
assert response.status_code == 401
def test_protected_endpoint_with_auth(client, auth_headers):
response = client.get("/admin/users", headers=auth_headers)
assert response.status_code == 200
def test_expired_token(client):
response = client.get(
"/admin/users",
headers={"Authorization": "Bearer expired.token.here"}
)
assert response.status_code == 401
assert response.json()["detail"] == "Token expired"
Tester les webhooks
from unittest.mock import ANY
def test_stripe_webhook(client, mocker):
mock_process = mocker.patch("myapp.webhooks.process_payment")
payload = {
"type": "payment_intent.succeeded",
"data": {
"object": {
"id": "pi_123",
"amount": 9999,
"currency": "eur"
}
}
}
response = client.post(
"/webhooks/stripe",
json=payload,
headers={"Stripe-Signature": "test_sig"}
)
assert response.status_code == 200
mock_process.assert_called_once_with(
payment_id="pi_123",
amount=9999,
currency="eur"
)
Pattern : test database avec migrations
Au lieu de create_all(), applique les vraies migrations Alembic :
# tests/integration/conftest.py
from alembic.config import Config
from alembic import command
@pytest.fixture(scope="session")
def db_engine(postgres_container):
url = postgres_container.get_connection_url()
# Applique les migrations Alembic
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", url)
command.upgrade(alembic_cfg, "head")
engine = create_engine(url)
yield engine
engine.dispose()
Avantage : tu testes que tes migrations fonctionnent, pas juste que tes models se créent.
Pattern : fixtures de données réalistes
Avec factories (factory_boy)
# tests/factories.py
import factory
from myapp.models import User, Product, Order
class UserFactory(factory.Factory):
class Meta:
model = User
name = factory.Faker("name", locale="fr_FR")
email = factory.LazyAttribute(lambda o: f"{o.name.lower().replace(' ', '.')}@example.com")
age = factory.Faker("random_int", min=18, max=80)
role = "member"
class Params:
admin = factory.Trait(
role="admin",
email=factory.LazyAttribute(lambda o: f"admin.{o.name.lower().replace(' ', '.')}@example.com")
)
class ProductFactory(factory.Factory):
class Meta:
model = Product
name = factory.Faker("word")
price = factory.Faker("pydecimal", left_digits=3, right_digits=2, positive=True)
stock = factory.Faker("random_int", min=0, max=1000)
class OrderFactory(factory.Factory):
class Meta:
model = Order
user = factory.SubFactory(UserFactory)
status = "pending"
total = factory.LazyAttribute(lambda o: sum(p.price for p in o.products))
# tests/integration/test_orders.py
from tests.factories import UserFactory, ProductFactory, OrderFactory
def test_user_order_count(db_session):
user = UserFactory(name="Alice")
db_session.add(user)
for _ in range(3):
order = OrderFactory(user=user)
db_session.add(order)
db_session.commit()
assert user.order_count == 3
def test_admin_sees_all_orders(db_session, client, auth_headers):
# Créer plusieurs users avec des commandes
for _ in range(5):
user = UserFactory()
db_session.add(user)
order = OrderFactory(user=user)
db_session.add(order)
db_session.commit()
response = client.get("/admin/orders", headers=auth_headers)
assert len(response.json()["items"]) == 5
Tester les tâches asynchrones (Celery, etc.)
from myapp.tasks import send_notification
def test_send_notification_task(mocker, redis_client):
mock_email = mocker.patch("myapp.tasks.email_service.send")
# Exécute la tâche en mode synchrone (sans Celery worker)
result = send_notification.apply(args=["user@example.com", "Hello!"]).get()
assert result == "sent"
mock_email.assert_called_once_with(
to="user@example.com",
body="Hello!"
)
Ou avec un vrai worker Celery en testcontainers :
@pytest.fixture(scope="session")
def celery_worker(redis_container):
"""Configure Celery pour utiliser le Redis de test."""
from myapp.celery_app import celery_app
redis_url = f"redis://{redis_container.get_container_host_ip()}:{redis_container.get_exposed_port(6379)}"
celery_app.conf.update(
broker_url=redis_url,
result_backend=redis_url,
task_always_eager=True, # Exécution synchrone
)
yield celery_app
Tester les event-driven systems
Avec un vrai Kafka (testcontainers)
from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json
@pytest.fixture(scope="session")
def kafka_container():
with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
yield kafka
@pytest.fixture
def kafka_producer(kafka_container):
producer = KafkaProducer(
bootstrap_servers=kafka_container.get_bootstrap_server(),
value_serializer=lambda v: json.dumps(v).encode()
)
yield producer
producer.close()
def test_event_processing(kafka_producer, kafka_container):
# Publie un événement
kafka_producer.send("user-events", {
"type": "user_created",
"data": {"id": 1, "name": "Alice"}
})
kafka_producer.flush()
# Consomme et vérifie
consumer = KafkaConsumer(
"user-events",
bootstrap_servers=kafka_container.get_bootstrap_server(),
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode()),
consumer_timeout_ms=5000
)
messages = list(consumer)
assert len(messages) == 1
assert messages[0].value["type"] == "user_created"
Performance des tests d’intégration
Réutiliser les conteneurs
# Scope "session" = un seul conteneur pour tous les tests
@pytest.fixture(scope="session")
def postgres_container():
with PostgresContainer("postgres:16") as postgres:
yield postgres
# Rollback au lieu de recréer la DB à chaque test
@pytest.fixture
def db_session(db_engine):
connection = db_engine.connect()
transaction = connection.begin()
session = sessionmaker(bind=connection)()
yield session
session.close()
transaction.rollback() # ← Instantané, pas de cleanup SQL
connection.close()
Paralléliser avec pytest-xdist
# Chaque worker a ses propres conteneurs
pytest tests/integration/ -n 4
Pour que ça marche, chaque worker doit avoir ses propres conteneurs :
@pytest.fixture(scope="session")
def postgres_container(worker_id):
"""Un conteneur par worker xdist."""
with PostgresContainer("postgres:16") as postgres:
yield postgres
Cache des images Docker
# Pre-pull les images avant les tests
docker pull postgres:16
docker pull redis:7-alpine
docker pull mongo:7
Dans le CI, ajoute ça dans un job de setup ou utilise le cache Docker du runner.
Bonnes pratiques
-
Scope “session” pour les conteneurs — Démarrer un conteneur prend 2-5 secondes. Fais-le une seule fois.
-
Rollback au lieu de truncate —
transaction.rollback()est instantané.TRUNCATE TABLEprend du temps. -
Sépare unit et integration —
tests/unit/ettests/integration/. Lance-les séparément dans le CI :pytest tests/unit/ # Rapide, pas de Docker pytest tests/integration/ # Plus lent, nécessite Docker -
Données réalistes avec factory_boy — Pas de
User(name="test")partout. -
Teste les cas d’erreur — Contraintes DB, timeouts, connexions perdues.
-
Health check avant les tests — Vérifie que le conteneur est prêt :
from testcontainers.core.waiting_utils import wait_for_logs wait_for_logs(container, "database system is ready to accept connections") -
Logs utiles en cas d’échec — Configure pytest pour afficher les logs des conteneurs quand un test échoue.
Le prochain cours intègre tout ça dans un pipeline CI/CD : tests dans GitLab/GitHub, SonarQube, quality gates. La boucle est bouclée. 🔄
À toi de jouer
Exercice 1 — Tests avec Testcontainers
Écris un test d’intégration qui lance un vrai PostgreSQL avec testcontainers :
import pytest
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="module")
def postgres():
with PostgresContainer("postgres:16") as pg:
yield pg
def test_insert_and_query(postgres):
conn = postgres.get_connection_url()
# Crée une table, insère des données, vérifie le résultat
Exercice 2 — API Testing complet
Teste une API FastAPI avec le TestClient :
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_create_and_get():
resp = client.post("/items", json={"name": "test"})
assert resp.status_code == 201
item_id = resp.json()["id"]
resp = client.get(f"/items/{item_id}")
assert resp.json()["name"] == "test"
Teste tous les cas : création, lecture, update, delete, 404, validation errors.
Exercice 3 — Test avec Redis et migrations
Crée un test d’intégration qui lance Redis ET PostgreSQL via testcontainers, applique des migrations Alembic, seed la base avec des fixtures, exécute des tests, et vérifie que le cache Redis est cohérent avec la DB.
Contenu réservé aux abonnés
Ce chapitre fait partie de la formation complète. Abonne-toi pour débloquer tous les contenus.
Débloquer pour 29 CHF/moisLe chapitre 1 de chaque formation est gratuit.
Série pas encore débloquée
Termine la série prérequise d'abord pour accéder à ce contenu.
Aller à la série prérequiseSérie : Tests & Qualité
3 / 4Sur cette page
Articles liés
pytest : le framework de test Python ultime
Maîtrise pytest de A à Z : fixtures, mocking, parametrize, markers, coverage. Tout ce qu'il faut pour tester du Python comme un pro.
FastAPI : ta première API en Python
Comprends les APIs, le protocole HTTP, l'architecture REST, puis installe FastAPI et crée ton premier endpoint. Le socle pour tout ce qui suit.
Routes, paramètres et validation
Path parameters, query parameters, validation des données avec Pydantic et gestion des headers HTTP dans FastAPI.