Tester des fonctions Python est relativement simple. Tester un pipeline data complet qui orchestre Airflow, dbt et BigQuery est un vrai défi d ingénierie. En entretien, on évalue la stratégie de test complète.
Comment organisez-vous vos tests pour un pipeline data complet ?
Comment testez-vous qu un pipeline Airflow + dbt + Snowflake produit les bons résultats ?
import pytest
from unittest.mock import patch
@pytest.fixture(scope='session')
def test_environment():
# Créer un schema de test dans Snowflake
setup_test_schema('test_pipeline_' + session_id)
load_test_fixtures('tests/fixtures/orders_sample.csv')
yield
teardown_test_schema('test_pipeline_' + session_id)
def test_pipeline_fct_revenue(test_environment):
# Lancer dbt sur le schema de test
result = subprocess.run([
'dbt', 'run', '--target', 'test',
'--select', 'fct_revenue',
'--vars', f'{{schema: test_pipeline_{session_id}}}'
])
assert result.returncode == 0
# Vérifier les résultats
actual = run_query('SELECT SUM(revenue) FROM test_schema.fct_revenue')
expected = 125400.50 # calculé manuellement sur les fixtures
assert abs(actual - expected) < 0.01Comment testez-vous qu un DAG Airflow est correctement configuré sans l exécuter ?
import pytest
from airflow.models import DagBag
@pytest.fixture
def dag_bag():
return DagBag(dag_folder='dags/', include_examples=False)
def test_dags_load_without_errors(dag_bag):
assert len(dag_bag.import_errors) == 0, \
f'DAG import errors: {dag_bag.import_errors}'
def test_pipeline_dag_structure(dag_bag):
dag = dag_bag.get_dag('pipeline_ventes')
assert dag is not None
# Vérifier les tâches attendues
task_ids = [t.task_id for t in dag.tasks]
assert 'extract_orders' in task_ids
assert 'transform_dbt' in task_ids
assert 'validate_quality' in task_ids
def test_dag_schedule(dag_bag):
dag = dag_bag.get_dag('pipeline_ventes')
assert dag.schedule_interval == '@daily'
assert dag.catchup == False # pas de backfill automatique
def test_dag_no_cycles(dag_bag):
for dag_id, dag in dag_bag.dags.items():
# topological sort échoue si cycle
assert dag.topological_sort() is not NoneQu est-ce que le contract testing pour les pipelines data ?
Comment détectez-vous une régression de données lors d une refactorisation dbt ?
# dbt-audit-helper : comparer l ancien et le nouveau modèle
-- audit_helper/compare_and_get_metric_sql
{{ audit_helper.compare_and_get_metric_sql(
a_relation = ref('fct_revenue_old'),
b_relation = ref('fct_revenue_new'),
primary_key = 'order_id'
) }}
-- Comparer les sommes entre ancienne et nouvelle version
SELECT
'total_revenue' AS metric,
SUM(amount) AS old_value,
(SELECT SUM(amount) FROM {{ ref('fct_revenue_new') }}) AS new_value,
ABS(SUM(amount) - (SELECT SUM(amount) FROM {{ ref('fct_revenue_new') }}))
/ NULLIF(SUM(amount), 0) AS pct_diff
FROM {{ ref('fct_revenue_old') }}
HAVING pct_diff > 0.001 -- plus de 0.1% de différence = alerteQu est-ce que le chaos engineering appliqué aux pipelines data ?
| Niveau | Maitrise | Signal GO | NO-GO |
|---|---|---|---|
| Confirmé | Tests unitaires, tests DAGs Airflow, dbt tests | A des tests unitaires pour ses transformations, teste ses DAGs avec DagBag | N a aucun test automatisé sur ses pipelines |
| Senior | Tests d intégration, contract testing, tests de régression | A un environnement de test isolé, utilise dbt-audit-helper pour les régressions | Ne sait pas comment tester un pipeline complet |
Premier entretien gratuit. Rapport GO/NO-GO sous 48h.