AccueilBlogTest technique Python async pour la data : asyncio, httpx, ingestion parallèle
Guide recrutement data

Test technique Python async pour la data : asyncio, httpx, ingestion parallèle

L async Python permet de multiplier par 10 à 100 la vitesse d ingestion d APIs. En entretien Senior, on évalue la maîtrise d asyncio pour écrire des pipelines d ingestion efficaces.

Data Builder·Juin 2025·6 min de lecture·Data Engineer
Sommaire
  1. Sync vs Async
  2. asyncio et coroutines
  3. httpx async : ingestion d APIs
  4. Contrôler la concurrence
  5. asyncio.gather et TaskGroup
  6. Pipeline d ingestion async
  7. Grille

1Synchrone vs Asynchrone

Question discriminante

Pourquoi l async est-il utile pour l ingestion de données depuis des APIs ?

  • Problème synchrone — appeler 100 APIs en séquence : 100 * 200ms = 20 secondes d attente pure
  • Solution async — lancer toutes les requêtes en parallèle : 200ms + overhead = 20x plus rapide
  • IO-bound vs CPU-bound — async est efficace pour les opérations IO (réseau, fichiers). Pas pour les calculs CPU (ML, transformations lourdes)
  • GIL Python — async contourne le GIL pour les opérations IO. Pour le CPU, utiliser multiprocessing

2asyncio : le moteur de l async Python

Question discriminante

Qu est-ce qu une coroutine ? Quelle est la différence entre await et asyncio.create_task ?

import asyncio # Coroutine : fonction async qui peut être suspendue async def fetch_data(url: str) -> dict: await asyncio.sleep(0.2) # simule une requête réseau return {'url': url, 'data': 'result'} # await : suspend et attend le résultat result = await fetch_data('https://api.com/orders') # create_task : lancer sans attendre (fire and forget) async def main(): # Séquentiel : 3 * 200ms = 600ms r1 = await fetch_data('https://api.com/orders/1') r2 = await fetch_data('https://api.com/orders/2') r3 = await fetch_data('https://api.com/orders/3') # Parallèle : 200ms results = await asyncio.gather( fetch_data('https://api.com/orders/1'), fetch_data('https://api.com/orders/2'), fetch_data('https://api.com/orders/3') ) asyncio.run(main())

3httpx async : ingestion d APIs

Question discriminante

Comment ingérez-vous 1000 endpoints d une API en parallèle avec httpx ?

import httpx import asyncio from typing import List, Dict async def fetch_endpoint( client: httpx.AsyncClient, url: str, semaphore: asyncio.Semaphore ) -> Dict: async with semaphore: # limite la concurrence try: response = await client.get(url, timeout=30.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 429: await asyncio.sleep(60) # rate limit return await fetch_endpoint(client, url, semaphore) raise async def ingest_all(urls: List[str]) -> List[Dict]: semaphore = asyncio.Semaphore(50) # max 50 requêtes simultanées async with httpx.AsyncClient() as client: tasks = [fetch_endpoint(client, url, semaphore) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True) results = asyncio.run(ingest_all(urls))

4Contrôler la concurrence avec Semaphore

Question discriminante

Pourquoi utilisez-vous un Semaphore ? Que se passe-t-il sans ?

  • Sans Semaphore — asyncio.gather sur 10 000 URLs ouvre 10 000 connexions simultanément. Crash mémoire, ban de l API, saturation réseau
  • Avec Semaphore — limite le nombre de coroutines actives simultanément (50-100 est un bon défaut)
  • asyncio.Semaphore(n) — au plus n coroutines passent en même temps. Les autres attendent
  • Connection pooling — httpx.AsyncClient gère automatiquement un pool de connexions. Réutilise les connexions TCP

5asyncio.gather vs TaskGroup

Question discriminante

Quelle est la différence entre asyncio.gather et asyncio.TaskGroup (Python 3.11+) ?

import asyncio # asyncio.gather : Python 3.8+ # Annule toutes les tâches si l une échoue (return_exceptions=False) results = await asyncio.gather(*tasks, return_exceptions=True) # TaskGroup : Python 3.11+ - syntaxe plus propre async def process_all(urls): results = [] async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch(url)) for url in urls] # Toutes les tâches sont terminées ici return [t.result() for t in tasks] # Timeout global sur un groupe de tâches try: async with asyncio.timeout(30.0): # Python 3.11+ results = await asyncio.gather(*tasks) except TimeoutError: print('Timeout global dépassé')

6Pipeline d ingestion async complet

Question discriminante

Montrez l architecture d un pipeline d ingestion async production-ready.

import asyncio, httpx, json from datetime import datetime class AsyncIngestionPipeline: def __init__(self, max_concurrent: int = 50, batch_size: int = 100): self.semaphore = asyncio.Semaphore(max_concurrent) self.batch_size = batch_size async def fetch(self, client: httpx.AsyncClient, url: str) -> dict | None: async with self.semaphore: for attempt in range(3): try: r = await client.get(url, timeout=30.0) r.raise_for_status() return r.json() except Exception as e: if attempt == 2: return None await asyncio.sleep(2 ** attempt) async def run(self, urls: list[str]) -> list[dict]: async with httpx.AsyncClient() as client: batches = [urls[i:i+self.batch_size] for i in range(0, len(urls), self.batch_size)] all_results = [] for batch in batches: results = await asyncio.gather( *[self.fetch(client, url) for url in batch] ) all_results.extend([r for r in results if r is not None]) return all_results
import asyncio, aiohttp from typing import AsyncIterator async def fetch_batch(session: aiohttp.ClientSession, url: str) -> dict: async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp: resp.raise_for_status() return await resp.json() async def extract_all_pages(base_url: str, pages: list[int]) -> list: async with aiohttp.ClientSession() as session: # Limiter la concurrence a 10 requetes simultanees semaphore = asyncio.Semaphore(10) async def fetch_limited(page): async with semaphore: return await fetch_batch(session, f'{base_url}?page={page}') results = await asyncio.gather(*[fetch_limited(p) for p in pages]) return [item for batch in results for item in batch] # Streaming async pour les gros volumes async def stream_large_dataset(url: str) -> AsyncIterator[dict]: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: async for line in resp.content: yield json.loads(line)
  • asyncio.gather vs asyncio.wait - gather : collecte tous les resultats dans l ordre, leve une exception si l un echoue. wait : plus de controle (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED)
  • Semaphore pour le rate limiting - asyncio.Semaphore(N) limite N coroutines simultanees. Essentiel pour ne pas surcharger une API avec des quotas
  • aiofiles pour l IO fichier - remplacer open() par aiofiles.open() dans les contextes async. L IO fichier bloque sinon la boucle d evenements
  • asyncpg et aiomysql - drivers de bases de donnees natifs async. Plus performants que les drivers sync dans un contexte asyncio

7Grille par niveau

NiveauMaitriseSignal GONO-GO
Confirméasyncio basique, asyncio.gather, httpx asyncA écrit un ingestion d API avec asyncio.gather, comprend await vs create_taskUtilise requests en sync pour ingérer 1000 endpoints
SeniorSemaphore, retry async, gestion rate limiting, TaskGroupUtilise Semaphore pour contrôler la concurrence, gère le rate limiting de manière asyncLance asyncio.gather sur 10000 URLs sans Semaphore

Vous recrutez un Data Engineer Python ?

Premier entretien gratuit. Rapport GO/NO-GO sous 48h.