Aller au contenu principal
TestsIntégrationPythonFormation

Tests d'intégration : testcontainers et API testing

30 min de lecture Tests & Qualité — Chapitre 3

Teste avec de vraies bases de données grâce à testcontainers, valide tes APIs avec des tests HTTP complets, et maîtrise les patterns d'intégration testing.

Le problème des mocks en intégration

Les tests unitaires mockent tout. C’est bien pour la rapidité et l’isolation, mais tu ne testes jamais la vraie interaction :

# ❌ Ce test passe, mais est-ce que la vraie requête SQL marche ?
def test_get_user(mocker):
    mock_db = mocker.MagicMock()
    mock_db.query.return_value.filter_by.return_value.first.return_value = User(
        id=1, name="John"
    )
    service = UserService(mock_db)
    user = service.get_by_id(1)
    assert user.name == "John"  # ← Le mock retourne ce que TU lui dis

Ce test ne vérifie pas :

  • Que la requête SQL est correcte
  • Que le mapping ORM fonctionne
  • Que les types sont compatibles
  • Que les contraintes de la DB sont respectées

Pour ça, tu as besoin de vrais tests d’intégration avec de vraies bases de données.


Testcontainers : des conteneurs Docker pour tes tests

testcontainers-python démarre des conteneurs Docker à la demande pour tes tests. Tu testes contre une vraie PostgreSQL, un vrai Redis, un vrai MongoDB — pas un mock.

Installation

pip install testcontainers[postgres,redis,mongodb]

Prérequis : Docker doit tourner sur la machine qui exécute les tests.

PostgreSQL avec testcontainers

# tests/integration/conftest.py
import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from myapp.models import Base

@pytest.fixture(scope="session")
def postgres_container():
    """Démarre un conteneur PostgreSQL pour toute la session de tests."""
    with PostgresContainer("postgres:16") as postgres:
        yield postgres

@pytest.fixture(scope="session")
def db_engine(postgres_container):
    """Crée un engine SQLAlchemy connecté au conteneur."""
    engine = create_engine(postgres_container.get_connection_url())
    Base.metadata.create_all(engine)
    yield engine
    engine.dispose()

@pytest.fixture
def db_session(db_engine):
    """Crée une session avec rollback après chaque test."""
    connection = db_engine.connect()
    transaction = connection.begin()
    Session = sessionmaker(bind=connection)
    session = Session()
    
    yield session
    
    session.close()
    transaction.rollback()
    connection.close()
# tests/integration/test_user_repository.py
from myapp.models import User
from myapp.repositories import UserRepository

def test_create_user(db_session):
    repo = UserRepository(db_session)
    
    user = repo.create(name="Alice", email="alice@example.com")
    
    assert user.id is not None
    assert user.name == "Alice"
    
    # Vérifie en DB directement
    saved = db_session.get(User, user.id)
    assert saved.email == "alice@example.com"

def test_find_user_by_email(db_session):
    repo = UserRepository(db_session)
    repo.create(name="Bob", email="bob@example.com")
    
    found = repo.find_by_email("bob@example.com")
    
    assert found is not None
    assert found.name == "Bob"

def test_find_nonexistent_user(db_session):
    repo = UserRepository(db_session)
    
    found = repo.find_by_email("ghost@example.com")
    
    assert found is None

def test_unique_email_constraint(db_session):
    repo = UserRepository(db_session)
    repo.create(name="Charlie", email="charlie@example.com")
    
    with pytest.raises(IntegrityError):
        repo.create(name="Charlie 2", email="charlie@example.com")

Redis avec testcontainers

# tests/integration/conftest.py
from testcontainers.redis import RedisContainer
import redis

@pytest.fixture(scope="session")
def redis_container():
    with RedisContainer("redis:7-alpine") as container:
        yield container

@pytest.fixture
def redis_client(redis_container):
    client = redis.Redis(
        host=redis_container.get_container_host_ip(),
        port=redis_container.get_exposed_port(6379),
        decode_responses=True,
    )
    yield client
    client.flushall()  # Clean après chaque test
# tests/integration/test_cache.py
from myapp.cache import CacheService

def test_cache_set_and_get(redis_client):
    cache = CacheService(redis_client)
    
    cache.set("user:1", {"name": "Alice"}, ttl=60)
    result = cache.get("user:1")
    
    assert result == {"name": "Alice"}

def test_cache_expiration(redis_client):
    cache = CacheService(redis_client)
    
    cache.set("temp", "value", ttl=1)
    
    import time
    time.sleep(1.5)
    
    result = cache.get("temp")
    assert result is None

def test_cache_invalidation(redis_client):
    cache = CacheService(redis_client)
    
    cache.set("user:1", {"name": "Alice"})
    cache.invalidate("user:1")
    
    assert cache.get("user:1") is None

MongoDB avec testcontainers

from testcontainers.mongodb import MongoDbContainer
from pymongo import MongoClient

@pytest.fixture(scope="session")
def mongo_container():
    with MongoDbContainer("mongo:7") as container:
        yield container

@pytest.fixture
def mongo_db(mongo_container):
    client = MongoClient(mongo_container.get_connection_url())
    db = client["test_db"]
    yield db
    client.drop_database("test_db")
def test_insert_document(mongo_db):
    collection = mongo_db["products"]
    
    result = collection.insert_one({
        "name": "Laptop",
        "price": 999.99,
        "tags": ["electronics", "computer"]
    })
    
    assert result.inserted_id is not None
    
    found = collection.find_one({"name": "Laptop"})
    assert found["price"] == 999.99

Conteneurs custom

Pour un service qui n’a pas de module testcontainers dédié :

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

@pytest.fixture(scope="session")
def elasticsearch():
    container = (
        DockerContainer("elasticsearch:8.12.0")
        .with_exposed_ports(9200)
        .with_env("discovery.type", "single-node")
        .with_env("xpack.security.enabled", "false")
        .with_env("ES_JAVA_OPTS", "-Xms512m -Xmx512m")
    )
    container.start()
    wait_for_logs(container, "started", timeout=60)
    
    host = container.get_container_host_ip()
    port = container.get_exposed_port(9200)
    
    yield f"http://{host}:{port}"
    
    container.stop()

API Testing : tester tes endpoints

Avec le TestClient de FastAPI

# tests/integration/conftest.py
import pytest
from fastapi.testclient import TestClient
from myapp.main import app

@pytest.fixture
def client():
    """Client de test pour FastAPI."""
    with TestClient(app) as client:
        yield client
# tests/integration/test_api.py

def test_health_check(client):
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json() == {"status": "healthy"}

def test_create_user(client):
    response = client.post("/users", json={
        "name": "Alice",
        "email": "alice@example.com",
        "password": "SecurePass123!"
    })
    
    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "Alice"
    assert data["email"] == "alice@example.com"
    assert "password" not in data  # Le password ne doit pas être retourné
    assert "id" in data

def test_create_user_duplicate_email(client):
    # Premier user
    client.post("/users", json={
        "name": "Alice",
        "email": "alice@example.com",
        "password": "Pass123!"
    })
    
    # Duplicate
    response = client.post("/users", json={
        "name": "Alice 2",
        "email": "alice@example.com",
        "password": "Pass456!"
    })
    
    assert response.status_code == 409
    assert "already exists" in response.json()["detail"]

def test_create_user_invalid_email(client):
    response = client.post("/users", json={
        "name": "Bad",
        "email": "not-an-email",
        "password": "Pass123!"
    })
    
    assert response.status_code == 422  # Validation error

def test_get_user(client):
    # Create
    create_response = client.post("/users", json={
        "name": "Bob",
        "email": "bob@example.com",
        "password": "Pass123!"
    })
    user_id = create_response.json()["id"]
    
    # Get
    response = client.get(f"/users/{user_id}")
    
    assert response.status_code == 200
    assert response.json()["name"] == "Bob"

def test_get_nonexistent_user(client):
    response = client.get("/users/99999")
    assert response.status_code == 404

def test_list_users_pagination(client):
    # Créer 15 users
    for i in range(15):
        client.post("/users", json={
            "name": f"User {i}",
            "email": f"user{i}@example.com",
            "password": "Pass123!"
        })
    
    # Page 1
    response = client.get("/users?page=1&per_page=10")
    assert response.status_code == 200
    data = response.json()
    assert len(data["items"]) == 10
    assert data["total"] == 15
    assert data["page"] == 1
    
    # Page 2
    response = client.get("/users?page=2&per_page=10")
    assert len(response.json()["items"]) == 5

Avec httpx (pour n’importe quelle API)

import httpx
import pytest

@pytest.fixture(scope="session")
def api_url():
    """URL de l'API de test (peut être un environnement de staging)."""
    return "http://localhost:8000"

@pytest.fixture
def api_client(api_url):
    with httpx.Client(base_url=api_url, timeout=10) as client:
        yield client

def test_api_response_time(api_client):
    response = api_client.get("/health")
    
    assert response.status_code == 200
    assert response.elapsed.total_seconds() < 0.5  # < 500ms

Tester l’authentification

@pytest.fixture
def auth_headers(client):
    """Obtient un token d'authentification."""
    response = client.post("/auth/login", json={
        "email": "admin@example.com",
        "password": "AdminPass123!"
    })
    token = response.json()["access_token"]
    return {"Authorization": f"Bearer {token}"}

def test_protected_endpoint_without_auth(client):
    response = client.get("/admin/users")
    assert response.status_code == 401

def test_protected_endpoint_with_auth(client, auth_headers):
    response = client.get("/admin/users", headers=auth_headers)
    assert response.status_code == 200

def test_expired_token(client):
    response = client.get(
        "/admin/users",
        headers={"Authorization": "Bearer expired.token.here"}
    )
    assert response.status_code == 401
    assert response.json()["detail"] == "Token expired"

Tester les webhooks

from unittest.mock import ANY

def test_stripe_webhook(client, mocker):
    mock_process = mocker.patch("myapp.webhooks.process_payment")
    
    payload = {
        "type": "payment_intent.succeeded",
        "data": {
            "object": {
                "id": "pi_123",
                "amount": 9999,
                "currency": "eur"
            }
        }
    }
    
    response = client.post(
        "/webhooks/stripe",
        json=payload,
        headers={"Stripe-Signature": "test_sig"}
    )
    
    assert response.status_code == 200
    mock_process.assert_called_once_with(
        payment_id="pi_123",
        amount=9999,
        currency="eur"
    )

Pattern : test database avec migrations

Au lieu de create_all(), applique les vraies migrations Alembic :

# tests/integration/conftest.py
from alembic.config import Config
from alembic import command

@pytest.fixture(scope="session")
def db_engine(postgres_container):
    url = postgres_container.get_connection_url()
    
    # Applique les migrations Alembic
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", url)
    command.upgrade(alembic_cfg, "head")
    
    engine = create_engine(url)
    yield engine
    engine.dispose()

Avantage : tu testes que tes migrations fonctionnent, pas juste que tes models se créent.


Pattern : fixtures de données réalistes

Avec factories (factory_boy)

# tests/factories.py
import factory
from myapp.models import User, Product, Order

class UserFactory(factory.Factory):
    class Meta:
        model = User
    
    name = factory.Faker("name", locale="fr_FR")
    email = factory.LazyAttribute(lambda o: f"{o.name.lower().replace(' ', '.')}@example.com")
    age = factory.Faker("random_int", min=18, max=80)
    role = "member"
    
    class Params:
        admin = factory.Trait(
            role="admin",
            email=factory.LazyAttribute(lambda o: f"admin.{o.name.lower().replace(' ', '.')}@example.com")
        )

class ProductFactory(factory.Factory):
    class Meta:
        model = Product
    
    name = factory.Faker("word")
    price = factory.Faker("pydecimal", left_digits=3, right_digits=2, positive=True)
    stock = factory.Faker("random_int", min=0, max=1000)

class OrderFactory(factory.Factory):
    class Meta:
        model = Order
    
    user = factory.SubFactory(UserFactory)
    status = "pending"
    total = factory.LazyAttribute(lambda o: sum(p.price for p in o.products))
# tests/integration/test_orders.py
from tests.factories import UserFactory, ProductFactory, OrderFactory

def test_user_order_count(db_session):
    user = UserFactory(name="Alice")
    db_session.add(user)
    
    for _ in range(3):
        order = OrderFactory(user=user)
        db_session.add(order)
    
    db_session.commit()
    
    assert user.order_count == 3

def test_admin_sees_all_orders(db_session, client, auth_headers):
    # Créer plusieurs users avec des commandes
    for _ in range(5):
        user = UserFactory()
        db_session.add(user)
        order = OrderFactory(user=user)
        db_session.add(order)
    db_session.commit()
    
    response = client.get("/admin/orders", headers=auth_headers)
    assert len(response.json()["items"]) == 5

Tester les tâches asynchrones (Celery, etc.)

from myapp.tasks import send_notification

def test_send_notification_task(mocker, redis_client):
    mock_email = mocker.patch("myapp.tasks.email_service.send")
    
    # Exécute la tâche en mode synchrone (sans Celery worker)
    result = send_notification.apply(args=["user@example.com", "Hello!"]).get()
    
    assert result == "sent"
    mock_email.assert_called_once_with(
        to="user@example.com",
        body="Hello!"
    )

Ou avec un vrai worker Celery en testcontainers :

@pytest.fixture(scope="session")
def celery_worker(redis_container):
    """Configure Celery pour utiliser le Redis de test."""
    from myapp.celery_app import celery_app
    
    redis_url = f"redis://{redis_container.get_container_host_ip()}:{redis_container.get_exposed_port(6379)}"
    celery_app.conf.update(
        broker_url=redis_url,
        result_backend=redis_url,
        task_always_eager=True,  # Exécution synchrone
    )
    yield celery_app

Tester les event-driven systems

Avec un vrai Kafka (testcontainers)

from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json

@pytest.fixture(scope="session")
def kafka_container():
    with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
        yield kafka

@pytest.fixture
def kafka_producer(kafka_container):
    producer = KafkaProducer(
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        value_serializer=lambda v: json.dumps(v).encode()
    )
    yield producer
    producer.close()

def test_event_processing(kafka_producer, kafka_container):
    # Publie un événement
    kafka_producer.send("user-events", {
        "type": "user_created",
        "data": {"id": 1, "name": "Alice"}
    })
    kafka_producer.flush()
    
    # Consomme et vérifie
    consumer = KafkaConsumer(
        "user-events",
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        auto_offset_reset="earliest",
        value_deserializer=lambda v: json.loads(v.decode()),
        consumer_timeout_ms=5000
    )
    
    messages = list(consumer)
    assert len(messages) == 1
    assert messages[0].value["type"] == "user_created"

Performance des tests d’intégration

Réutiliser les conteneurs

# Scope "session" = un seul conteneur pour tous les tests
@pytest.fixture(scope="session")
def postgres_container():
    with PostgresContainer("postgres:16") as postgres:
        yield postgres

# Rollback au lieu de recréer la DB à chaque test
@pytest.fixture
def db_session(db_engine):
    connection = db_engine.connect()
    transaction = connection.begin()
    session = sessionmaker(bind=connection)()
    
    yield session
    
    session.close()
    transaction.rollback()  # ← Instantané, pas de cleanup SQL
    connection.close()

Paralléliser avec pytest-xdist

# Chaque worker a ses propres conteneurs
pytest tests/integration/ -n 4

Pour que ça marche, chaque worker doit avoir ses propres conteneurs :

@pytest.fixture(scope="session")
def postgres_container(worker_id):
    """Un conteneur par worker xdist."""
    with PostgresContainer("postgres:16") as postgres:
        yield postgres

Cache des images Docker

# Pre-pull les images avant les tests
docker pull postgres:16
docker pull redis:7-alpine
docker pull mongo:7

Dans le CI, ajoute ça dans un job de setup ou utilise le cache Docker du runner.


Bonnes pratiques

  1. Scope “session” pour les conteneurs — Démarrer un conteneur prend 2-5 secondes. Fais-le une seule fois.

  2. Rollback au lieu de truncatetransaction.rollback() est instantané. TRUNCATE TABLE prend du temps.

  3. Sépare unit et integrationtests/unit/ et tests/integration/. Lance-les séparément dans le CI :

    pytest tests/unit/           # Rapide, pas de Docker
    pytest tests/integration/    # Plus lent, nécessite Docker
  4. Données réalistes avec factory_boy — Pas de User(name="test") partout.

  5. Teste les cas d’erreur — Contraintes DB, timeouts, connexions perdues.

  6. Health check avant les tests — Vérifie que le conteneur est prêt :

    from testcontainers.core.waiting_utils import wait_for_logs
    wait_for_logs(container, "database system is ready to accept connections")
  7. Logs utiles en cas d’échec — Configure pytest pour afficher les logs des conteneurs quand un test échoue.

Le prochain cours intègre tout ça dans un pipeline CI/CD : tests dans GitLab/GitHub, SonarQube, quality gates. La boucle est bouclée. 🔄

À toi de jouer

Exercice 1 — Tests avec Testcontainers

Écris un test d’intégration qui lance un vrai PostgreSQL avec testcontainers :

import pytest
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="module")
def postgres():
    with PostgresContainer("postgres:16") as pg:
        yield pg

def test_insert_and_query(postgres):
    conn = postgres.get_connection_url()
    # Crée une table, insère des données, vérifie le résultat

Exercice 2 — API Testing complet

Teste une API FastAPI avec le TestClient :

from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_create_and_get():
    resp = client.post("/items", json={"name": "test"})
    assert resp.status_code == 201
    item_id = resp.json()["id"]
    resp = client.get(f"/items/{item_id}")
    assert resp.json()["name"] == "test"

Teste tous les cas : création, lecture, update, delete, 404, validation errors.

Exercice 3 — Test avec Redis et migrations

Crée un test d’intégration qui lance Redis ET PostgreSQL via testcontainers, applique des migrations Alembic, seed la base avec des fixtures, exécute des tests, et vérifie que le cache Redis est cohérent avec la DB.

Articles liés