Tester des fonctions Python est relativement simple. Tester un pipeline data complet qui orchestre Airflow, dbt et BigQuery est un vrai défi d ingénierie. En entretien, on évalue la stratégie de test complète.
Comment organisez-vous vos tests pour un pipeline data complet ?
Comment testez-vous qu un pipeline Airflow + dbt + Snowflake produit les bons résultats ?
import pytest
from unittest.mock import patch
@pytest.fixture(scope='session')
def test_environment():
# Créer un schema de test dans Snowflake
setup_test_schema('test_pipeline_' + session_id)
load_test_fixtures('tests/fixtures/orders_sample.csv')
yield
teardown_test_schema('test_pipeline_' + session_id)
def test_pipeline_fct_revenue(test_environment):
# Lancer dbt sur le schema de test
result = subprocess.run([
'dbt', 'run', '--target', 'test',
'--select', 'fct_revenue',
'--vars', f'{{schema: test_pipeline_{session_id}}}'
])
assert result.returncode == 0
# Vérifier les résultats
actual = run_query('SELECT SUM(revenue) FROM test_schema.fct_revenue')
expected = 125400.50 # calculé manuellement sur les fixtures
assert abs(actual - expected) < 0.01Comment testez-vous qu un DAG Airflow est correctement configuré sans l exécuter ?
import pytest
from airflow.models import DagBag
@pytest.fixture
def dag_bag():
return DagBag(dag_folder='dags/', include_examples=False)
def test_dags_load_without_errors(dag_bag):
assert len(dag_bag.import_errors) == 0, \
f'DAG import errors: {dag_bag.import_errors}'
def test_pipeline_dag_structure(dag_bag):
dag = dag_bag.get_dag('pipeline_ventes')
assert dag is not None
# Vérifier les tâches attendues
task_ids = [t.task_id for t in dag.tasks]
assert 'extract_orders' in task_ids
assert 'transform_dbt' in task_ids
assert 'validate_quality' in task_ids
def test_dag_schedule(dag_bag):
dag = dag_bag.get_dag('pipeline_ventes')
assert dag.schedule_interval == '@daily'
assert dag.catchup == False # pas de backfill automatique
def test_dag_no_cycles(dag_bag):
for dag_id, dag in dag_bag.dags.items():
# topological sort échoue si cycle
assert dag.topological_sort() is not NoneQu est-ce que le contract testing pour les pipelines data ?
Comment détectez-vous une régression de données lors d une refactorisation dbt ?
# dbt-audit-helper : comparer l ancien et le nouveau modèle
-- audit_helper/compare_and_get_metric_sql
{{ audit_helper.compare_and_get_metric_sql(
a_relation = ref('fct_revenue_old'),
b_relation = ref('fct_revenue_new'),
primary_key = 'order_id'
) }}
-- Comparer les sommes entre ancienne et nouvelle version
SELECT
'total_revenue' AS metric,
SUM(amount) AS old_value,
(SELECT SUM(amount) FROM {{ ref('fct_revenue_new') }}) AS new_value,
ABS(SUM(amount) - (SELECT SUM(amount) FROM {{ ref('fct_revenue_new') }}))
/ NULLIF(SUM(amount), 0) AS pct_diff
FROM {{ ref('fct_revenue_old') }}
HAVING pct_diff > 0.001 -- plus de 0.1% de différence = alerteQu est-ce que le chaos engineering appliqué aux pipelines data ?
import pytest
import pandas as pd
from unittest.mock import patch, MagicMock
from datetime import date
# Test d'un pipeline de transformation
def test_transform_orders():
input_df = pd.DataFrame({
"order_id": [1, 2, 3],
"amount": [100.0, -50.0, 200.0], # -50 = cas invalide
"status": ["completed", "completed", "pending"]
})
result = transform_orders(input_df)
assert len(result) == 2 # les négatifs sont filtrés
assert result["amount"].min() > 0
assert "order_date" in result.columns # colonne ajoutée
assert result["amount"].sum() == 300.0
# Test avec mock de la base de données
@patch("src.pipeline.get_db_connection")
def test_extract_with_mock_db(mock_conn):
mock_conn.return_value.__enter__.return_value.execute.return_value.fetchall.return_value = [
(1, "2025-01-01", 100.0), (2, "2025-01-02", 200.0)
]
result = extract_orders(start_date=date(2025, 1, 1))
assert len(result) == 2
# Test de contrat de données (schéma)
def test_output_schema():
result = transform_orders(sample_df)
assert set(result.columns) == {"order_id", "customer_id", "amount", "status", "order_date"}
assert result.dtypes["amount"] == float
assert result["order_id"].is_unique@pytest.fixture. Centralise les données de test et évite la duplication| Niveau | Maitrise | Signal GO | NO-GO |
|---|---|---|---|
| Confirmé | Tests unitaires, tests DAGs Airflow, dbt tests | A des tests unitaires pour ses transformations, teste ses DAGs avec DagBag | N a aucun test automatisé sur ses pipelines |
| Senior | Tests d intégration, contract testing, tests de régression | A un environnement de test isolé, utilise dbt-audit-helper pour les régressions | Ne sait pas comment tester un pipeline complet |
Premier entretien gratuit. Rapport GO/NO-GO sous 48h.