AccueilBlogTest technique PySpark optimisation : partitions, caching, broadcast joins
Guide recrutement data

Test technique PySpark optimisation : partitions, caching, broadcast joins

Écrire du PySpark qui fonctionne est facile. Écrire du PySpark qui est rapide en production demande de maîtriser les partitions, les joins et la mémoire. C est ce qu on évalue en entretien Senior.

Data Builder·Juin 2025·7 min de lecture·Data Engineer
Sommaire
  1. Partitions : le fondamental
  2. Minimiser le shuffle
  3. Broadcast joins
  4. Caching stratégique
  5. UDFs : les éviter
  6. Diagnostiquer avec Spark UI
  7. Grille

1Partitions : l unité de parallélisme

Question discriminante

Combien de partitions devriez-vous avoir pour votre job Spark ? Comment les ajuster ?

from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() # Règle empirique : 2-4 partitions par coeur disponible # Pour 100GB de données : 200-400 partitions (128-256MB par partition) # Vérifier le nombre de partitions print(df.rdd.getNumPartitions()) # Repartitionner (shuffle) : distribuer uniformément df_balanced = df.repartition(200) # Repartitionner sur une colonne (pour les jointures) df_keyed = df.repartition(200, 'customer_id') # Coalesce (pas de shuffle) : réduire le nombre de partitions df_smaller = df.coalesce(50) # pour l écriture finale # Configuration shuffle partitions spark.conf.set('spark.sql.shuffle.partitions', '200') # AQE ajuste automatiquement ce nombre depuis Spark 3.0

2Minimiser le shuffle

Question discriminante

Pourquoi le shuffle est-il l opération la plus coûteuse dans Spark ?

# Shuffle = transfert de données entre executors # Cause : groupBy, join, distinct, repartition # MAUVAIS : groupBy sur une colonne à haute cardinalité df.groupBy('user_id').count() # beaucoup de shuffle # MIEUX : filtrer avant de grouper df.filter(df.date >= '2025-01-01').groupBy('region').count() # Bucketing : pré-partitionner pour éviter le shuffle aux joins df.write.bucketBy(50, 'customer_id').sortBy('customer_id')\ .saveAsTable('orders_bucketed') df2.write.bucketBy(50, 'customer_id').sortBy('customer_id')\ .saveAsTable('customers_bucketed') # Maintenant le join ne shuffle pas spark.sql('SELECT * FROM orders_bucketed JOIN customers_bucketed USING (customer_id)')

3Broadcast joins : quand et comment

Question discriminante

Qu est-ce qu un broadcast join ? Spark le fait-il automatiquement ?

from pyspark.sql.functions import broadcast # Sans broadcast : Spark fait un sort-merge join (avec shuffle) result = large_df.join(small_df, 'customer_id') # Avec broadcast : small_df est envoyé à chaque executor # Évite totalement le shuffle sur large_df result = large_df.join(broadcast(small_df), 'customer_id') # Automatique si small_df < spark.sql.autoBroadcastJoinThreshold # Par défaut : 10MB. Augmenter : spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '100m') # AQE (Adaptive Query Execution) peut convertir # automatiquement un sort-merge join en broadcast join # si une table s avère petite après filtrage

4Caching : quand conserver en mémoire

Question discriminante

Quand persistez-vous un DataFrame Spark ? Quelle est la différence entre cache() et persist() ?

from pyspark import StorageLevel # cache() = MEMORY_AND_DISK # Stocker en mémoire d abord, puis disque si nécessaire df_filtered = df.filter(df.status == 'active').cache() # persist() = contrôle granulaire df.persist(StorageLevel.MEMORY_ONLY) # RAM uniquement df.persist(StorageLevel.MEMORY_AND_DISK) # RAM + disque df.persist(StorageLevel.DISK_ONLY) # disque uniquement df.persist(StorageLevel.OFF_HEAP) # mémoire off-heap # Quand cacher : # 1. DataFrame réutilisé plusieurs fois dans le même job # 2. Calcul intermédiaire coûteux # Toujours unpersist après usage df_filtered.unpersist() # Vérifier ce qui est en cache spark.catalog.isCached('ma_table')

5UDFs : les éviter quand possible

Question discriminante

Pourquoi les Python UDFs sont-elles si lentes dans PySpark ? Quelles alternatives ?

from pyspark.sql.functions import pandas_udf, col import pandas as pd # MAUVAIS : Python UDF classique (très lent) from pyspark.sql.functions import udf @udf('string') def categorize(amount): if amount > 1000: return 'high' elif amount > 100: return 'medium' else: return 'low' # MIEUX : Pandas UDF (vectorisé via Apache Arrow) @pandas_udf('string') def categorize_vectorized(amount: pd.Series) -> pd.Series: return pd.cut(amount, bins=[0, 100, 1000, float('inf')], labels=['low', 'medium', 'high']).astype(str) # MEILLEUR : fonctions natives Spark (pas d UDF du tout) from pyspark.sql.functions import when df.withColumn('tier', when(col('amount') > 1000, 'high') .when(col('amount') > 100, 'medium') .otherwise('low') )

6Diagnostiquer avec Spark UI

Question discriminante

Comment utilisez-vous l UI Spark pour identifier les goulots d étranglement ?

7Grille par niveau

NiveauMaitriseSignal GONO-GO
ConfirméPartitions basiques, broadcast joins, cache()Ajuste les shuffle partitions, utilise broadcast sur les petites tablesNe sait pas ce qu est une partition Spark
SeniorBucketing, Pandas UDFs, Spark UI diagnostic, AQEA résolu un data skew, remplace les Python UDFs par des Pandas UDFsUtilise des Python UDFs sans savoir pourquoi c est lent

Vous recrutez un Data Engineer PySpark ?

Premier entretien gratuit. Rapport GO/NO-GO sous 48h.