AccueilBlogTest technique : tester les pipelines data de bout en bout
Guide recrutement data

Test technique : tester les pipelines data de bout en bout

Tester des fonctions Python est relativement simple. Tester un pipeline data complet qui orchestre Airflow, dbt et BigQuery est un vrai défi d ingénierie. En entretien, on évalue la stratégie de test complète.

Data Builder·Juin 2025·6 min de lecture·Data Engineer
Sommaire
  1. Pyramide de tests data
  2. Tests d intégration pipeline
  3. Tester les DAGs Airflow
  4. Contract testing
  5. Tests de régression
  6. Chaos engineering
  7. Grille

1La pyramide de tests pour les pipelines data

Question discriminante

Comment organisez-vous vos tests pour un pipeline data complet ?

  • Unitaires (base) — fonctions Python, transformations SQL avec DuckDB, macros dbt
  • Intégration (milieu) — pipeline complet sur des données de test, validation des sorties dbt
  • End-to-end (sommet) — pipeline complet de la source à la BI, sur un environnement de staging
  • Règle — 70% unitaires (rapides, peu coûteux), 20% intégration, 10% e2e (lents, coûteux)

2Tests d intégration : pipeline sur données de test

Question discriminante

Comment testez-vous qu un pipeline Airflow + dbt + Snowflake produit les bons résultats ?

import pytest from unittest.mock import patch @pytest.fixture(scope='session') def test_environment(): # Créer un schema de test dans Snowflake setup_test_schema('test_pipeline_' + session_id) load_test_fixtures('tests/fixtures/orders_sample.csv') yield teardown_test_schema('test_pipeline_' + session_id) def test_pipeline_fct_revenue(test_environment): # Lancer dbt sur le schema de test result = subprocess.run([ 'dbt', 'run', '--target', 'test', '--select', 'fct_revenue', '--vars', f'{{schema: test_pipeline_{session_id}}}' ]) assert result.returncode == 0 # Vérifier les résultats actual = run_query('SELECT SUM(revenue) FROM test_schema.fct_revenue') expected = 125400.50 # calculé manuellement sur les fixtures assert abs(actual - expected) < 0.01

3Tester les DAGs Airflow

Question discriminante

Comment testez-vous qu un DAG Airflow est correctement configuré sans l exécuter ?

import pytest from airflow.models import DagBag @pytest.fixture def dag_bag(): return DagBag(dag_folder='dags/', include_examples=False) def test_dags_load_without_errors(dag_bag): assert len(dag_bag.import_errors) == 0, \ f'DAG import errors: {dag_bag.import_errors}' def test_pipeline_dag_structure(dag_bag): dag = dag_bag.get_dag('pipeline_ventes') assert dag is not None # Vérifier les tâches attendues task_ids = [t.task_id for t in dag.tasks] assert 'extract_orders' in task_ids assert 'transform_dbt' in task_ids assert 'validate_quality' in task_ids def test_dag_schedule(dag_bag): dag = dag_bag.get_dag('pipeline_ventes') assert dag.schedule_interval == '@daily' assert dag.catchup == False # pas de backfill automatique def test_dag_no_cycles(dag_bag): for dag_id, dag in dag_bag.dags.items(): # topological sort échoue si cycle assert dag.topological_sort() is not None

4Contract testing entre pipelines

Question discriminante

Qu est-ce que le contract testing pour les pipelines data ?

  • Contrat producteur/consommateur — le pipeline A (producteur) s engage sur un schéma et une qualité. Le pipeline B (consommateur) s y fie
  • Test côté producteur — vérifier que les données produites respectent le contract (schéma, fraîcheur, valeurs)
  • Test côté consommateur — vérifier que les données reçues correspondent à ce qui était attendu
  • Implementation — dbt tests + assertions sur le schéma, INFORMATION_SCHEMA pour vérifier les types

5Tests de régression de données

Question discriminante

Comment détectez-vous une régression de données lors d une refactorisation dbt ?

# dbt-audit-helper : comparer l ancien et le nouveau modèle -- audit_helper/compare_and_get_metric_sql {{ audit_helper.compare_and_get_metric_sql( a_relation = ref('fct_revenue_old'), b_relation = ref('fct_revenue_new'), primary_key = 'order_id' ) }} -- Comparer les sommes entre ancienne et nouvelle version SELECT 'total_revenue' AS metric, SUM(amount) AS old_value, (SELECT SUM(amount) FROM {{ ref('fct_revenue_new') }}) AS new_value, ABS(SUM(amount) - (SELECT SUM(amount) FROM {{ ref('fct_revenue_new') }})) / NULLIF(SUM(amount), 0) AS pct_diff FROM {{ ref('fct_revenue_old') }} HAVING pct_diff > 0.001 -- plus de 0.1% de différence = alerte

6Chaos engineering pour les pipelines

Question discriminante

Qu est-ce que le chaos engineering appliqué aux pipelines data ?

  • Principe — injecter délibérément des pannes pour tester la résilience du pipeline
  • Exemples pour la data — couper la source de données, injecter des données corrompues, retarder des livraisons
  • 'Si ma table source est vide, que se passe-t-il ?' — le pipeline échoue proprement avec une alerte, ou produit silencieusement une table vide en production
  • 'Si une colonne est renommée, comment je le détecte ?' — les tests dbt staging détectent le changement de schéma
  • Objectif — trouver les points de défaillance avant qu ils surviennent en production
import pytest import pandas as pd from unittest.mock import patch, MagicMock from datetime import date # Test d'un pipeline de transformation def test_transform_orders(): input_df = pd.DataFrame({ "order_id": [1, 2, 3], "amount": [100.0, -50.0, 200.0], # -50 = cas invalide "status": ["completed", "completed", "pending"] }) result = transform_orders(input_df) assert len(result) == 2 # les négatifs sont filtrés assert result["amount"].min() > 0 assert "order_date" in result.columns # colonne ajoutée assert result["amount"].sum() == 300.0 # Test avec mock de la base de données @patch("src.pipeline.get_db_connection") def test_extract_with_mock_db(mock_conn): mock_conn.return_value.__enter__.return_value.execute.return_value.fetchall.return_value = [ (1, "2025-01-01", 100.0), (2, "2025-01-02", 200.0) ] result = extract_orders(start_date=date(2025, 1, 1)) assert len(result) == 2 # Test de contrat de données (schéma) def test_output_schema(): result = transform_orders(sample_df) assert set(result.columns) == {"order_id", "customer_id", "amount", "status", "order_date"} assert result.dtypes["amount"] == float assert result["order_id"].is_unique
  • Pyramide de tests data — unité (logique de transformation) + intégration (connexion source/dest) + end-to-end (pipeline complet en staging). Priorité aux tests unitaires
  • Fixtures pytest — créer des DataFrames de test réutilisables via @pytest.fixture. Centralise les données de test et évite la duplication
  • Tests de contrat — vérifier que le schéma de sortie est stable. Un changement de colonne ou de type casse les consumers en aval
  • Property-based testing — Hypothesis génère automatiquement des cas de test (valeurs limites, NaN, chaînes vides) que vous n'auriez pas pensé à écrire
  • dbt tests vs pytest — dbt tests pour la qualité des données en SQL (uniqueness, not null, accepted values). pytest pour la logique Python des pipelines

7Grille par niveau

NiveauMaitriseSignal GONO-GO
ConfirméTests unitaires, tests DAGs Airflow, dbt testsA des tests unitaires pour ses transformations, teste ses DAGs avec DagBagN a aucun test automatisé sur ses pipelines
SeniorTests d intégration, contract testing, tests de régressionA un environnement de test isolé, utilise dbt-audit-helper pour les régressionsNe sait pas comment tester un pipeline complet

Vous recrutez un Data Engineer rigoureux ?

Premier entretien gratuit. Rapport GO/NO-GO sous 48h.