AccueilBlogTest technique Python async pour la data : asyncio, httpx, ingestion parallèle
Guide recrutement data

Test technique Python async pour la data : asyncio, httpx, ingestion parallèle

L async Python permet de multiplier par 10 à 100 la vitesse d ingestion d APIs. En entretien Senior, on évalue la maîtrise d asyncio pour écrire des pipelines d ingestion efficaces.

Data Builder·Juin 2025·6 min de lecture·Data Engineer
Sommaire
  1. Sync vs Async
  2. asyncio et coroutines
  3. httpx async : ingestion d APIs
  4. Contrôler la concurrence
  5. asyncio.gather et TaskGroup
  6. Pipeline d ingestion async
  7. Grille

1Synchrone vs Asynchrone

Question discriminante

Pourquoi l async est-il utile pour l ingestion de données depuis des APIs ?

2asyncio : le moteur de l async Python

Question discriminante

Qu est-ce qu une coroutine ? Quelle est la différence entre await et asyncio.create_task ?

import asyncio # Coroutine : fonction async qui peut être suspendue async def fetch_data(url: str) -> dict: await asyncio.sleep(0.2) # simule une requête réseau return {'url': url, 'data': 'result'} # await : suspend et attend le résultat result = await fetch_data('https://api.com/orders') # create_task : lancer sans attendre (fire and forget) async def main(): # Séquentiel : 3 * 200ms = 600ms r1 = await fetch_data('https://api.com/orders/1') r2 = await fetch_data('https://api.com/orders/2') r3 = await fetch_data('https://api.com/orders/3') # Parallèle : 200ms results = await asyncio.gather( fetch_data('https://api.com/orders/1'), fetch_data('https://api.com/orders/2'), fetch_data('https://api.com/orders/3') ) asyncio.run(main())

3httpx async : ingestion d APIs

Question discriminante

Comment ingérez-vous 1000 endpoints d une API en parallèle avec httpx ?

import httpx import asyncio from typing import List, Dict async def fetch_endpoint( client: httpx.AsyncClient, url: str, semaphore: asyncio.Semaphore ) -> Dict: async with semaphore: # limite la concurrence try: response = await client.get(url, timeout=30.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 429: await asyncio.sleep(60) # rate limit return await fetch_endpoint(client, url, semaphore) raise async def ingest_all(urls: List[str]) -> List[Dict]: semaphore = asyncio.Semaphore(50) # max 50 requêtes simultanées async with httpx.AsyncClient() as client: tasks = [fetch_endpoint(client, url, semaphore) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True) results = asyncio.run(ingest_all(urls))

4Contrôler la concurrence avec Semaphore

Question discriminante

Pourquoi utilisez-vous un Semaphore ? Que se passe-t-il sans ?

5asyncio.gather vs TaskGroup

Question discriminante

Quelle est la différence entre asyncio.gather et asyncio.TaskGroup (Python 3.11+) ?

import asyncio # asyncio.gather : Python 3.8+ # Annule toutes les tâches si l une échoue (return_exceptions=False) results = await asyncio.gather(*tasks, return_exceptions=True) # TaskGroup : Python 3.11+ - syntaxe plus propre async def process_all(urls): results = [] async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch(url)) for url in urls] # Toutes les tâches sont terminées ici return [t.result() for t in tasks] # Timeout global sur un groupe de tâches try: async with asyncio.timeout(30.0): # Python 3.11+ results = await asyncio.gather(*tasks) except TimeoutError: print('Timeout global dépassé')

6Pipeline d ingestion async complet

Question discriminante

Montrez l architecture d un pipeline d ingestion async production-ready.

import asyncio, httpx, json from datetime import datetime class AsyncIngestionPipeline: def __init__(self, max_concurrent: int = 50, batch_size: int = 100): self.semaphore = asyncio.Semaphore(max_concurrent) self.batch_size = batch_size async def fetch(self, client: httpx.AsyncClient, url: str) -> dict | None: async with self.semaphore: for attempt in range(3): try: r = await client.get(url, timeout=30.0) r.raise_for_status() return r.json() except Exception as e: if attempt == 2: return None await asyncio.sleep(2 ** attempt) async def run(self, urls: list[str]) -> list[dict]: async with httpx.AsyncClient() as client: batches = [urls[i:i+self.batch_size] for i in range(0, len(urls), self.batch_size)] all_results = [] for batch in batches: results = await asyncio.gather( *[self.fetch(client, url) for url in batch] ) all_results.extend([r for r in results if r is not None]) return all_results

7Grille par niveau

NiveauMaitriseSignal GONO-GO
Confirméasyncio basique, asyncio.gather, httpx asyncA écrit un ingestion d API avec asyncio.gather, comprend await vs create_taskUtilise requests en sync pour ingérer 1000 endpoints
SeniorSemaphore, retry async, gestion rate limiting, TaskGroupUtilise Semaphore pour contrôler la concurrence, gère le rate limiting de manière asyncLance asyncio.gather sur 10000 URLs sans Semaphore

Vous recrutez un Data Engineer Python ?

Premier entretien gratuit. Rapport GO/NO-GO sous 48h.