L async Python permet de multiplier par 10 à 100 la vitesse d ingestion d APIs. En entretien Senior, on évalue la maîtrise d asyncio pour écrire des pipelines d ingestion efficaces.
Pourquoi l async est-il utile pour l ingestion de données depuis des APIs ?
Qu est-ce qu une coroutine ? Quelle est la différence entre await et asyncio.create_task ?
import asyncio
# Coroutine : fonction async qui peut être suspendue
async def fetch_data(url: str) -> dict:
await asyncio.sleep(0.2) # simule une requête réseau
return {'url': url, 'data': 'result'}
# await : suspend et attend le résultat
result = await fetch_data('https://api.com/orders')
# create_task : lancer sans attendre (fire and forget)
async def main():
# Séquentiel : 3 * 200ms = 600ms
r1 = await fetch_data('https://api.com/orders/1')
r2 = await fetch_data('https://api.com/orders/2')
r3 = await fetch_data('https://api.com/orders/3')
# Parallèle : 200ms
results = await asyncio.gather(
fetch_data('https://api.com/orders/1'),
fetch_data('https://api.com/orders/2'),
fetch_data('https://api.com/orders/3')
)
asyncio.run(main())Comment ingérez-vous 1000 endpoints d une API en parallèle avec httpx ?
import httpx
import asyncio
from typing import List, Dict
async def fetch_endpoint(
client: httpx.AsyncClient,
url: str,
semaphore: asyncio.Semaphore
) -> Dict:
async with semaphore: # limite la concurrence
try:
response = await client.get(url, timeout=30.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
await asyncio.sleep(60) # rate limit
return await fetch_endpoint(client, url, semaphore)
raise
async def ingest_all(urls: List[str]) -> List[Dict]:
semaphore = asyncio.Semaphore(50) # max 50 requêtes simultanées
async with httpx.AsyncClient() as client:
tasks = [fetch_endpoint(client, url, semaphore) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
results = asyncio.run(ingest_all(urls))Pourquoi utilisez-vous un Semaphore ? Que se passe-t-il sans ?
Quelle est la différence entre asyncio.gather et asyncio.TaskGroup (Python 3.11+) ?
import asyncio
# asyncio.gather : Python 3.8+
# Annule toutes les tâches si l une échoue (return_exceptions=False)
results = await asyncio.gather(*tasks, return_exceptions=True)
# TaskGroup : Python 3.11+ - syntaxe plus propre
async def process_all(urls):
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(url)) for url in urls]
# Toutes les tâches sont terminées ici
return [t.result() for t in tasks]
# Timeout global sur un groupe de tâches
try:
async with asyncio.timeout(30.0): # Python 3.11+
results = await asyncio.gather(*tasks)
except TimeoutError:
print('Timeout global dépassé')Montrez l architecture d un pipeline d ingestion async production-ready.
import asyncio, httpx, json
from datetime import datetime
class AsyncIngestionPipeline:
def __init__(self, max_concurrent: int = 50, batch_size: int = 100):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.batch_size = batch_size
async def fetch(self, client: httpx.AsyncClient, url: str) -> dict | None:
async with self.semaphore:
for attempt in range(3):
try:
r = await client.get(url, timeout=30.0)
r.raise_for_status()
return r.json()
except Exception as e:
if attempt == 2: return None
await asyncio.sleep(2 ** attempt)
async def run(self, urls: list[str]) -> list[dict]:
async with httpx.AsyncClient() as client:
batches = [urls[i:i+self.batch_size]
for i in range(0, len(urls), self.batch_size)]
all_results = []
for batch in batches:
results = await asyncio.gather(
*[self.fetch(client, url) for url in batch]
)
all_results.extend([r for r in results if r is not None])
return all_results| Niveau | Maitrise | Signal GO | NO-GO |
|---|---|---|---|
| Confirmé | asyncio basique, asyncio.gather, httpx async | A écrit un ingestion d API avec asyncio.gather, comprend await vs create_task | Utilise requests en sync pour ingérer 1000 endpoints |
| Senior | Semaphore, retry async, gestion rate limiting, TaskGroup | Utilise Semaphore pour contrôler la concurrence, gère le rate limiting de manière async | Lance asyncio.gather sur 10000 URLs sans Semaphore |
Premier entretien gratuit. Rapport GO/NO-GO sous 48h.