Écrire du PySpark qui fonctionne est facile. Écrire du PySpark qui est rapide en production demande de maîtriser les partitions, les joins et la mémoire. C est ce qu on évalue en entretien Senior.
Combien de partitions devriez-vous avoir pour votre job Spark ? Comment les ajuster ?
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Règle empirique : 2-4 partitions par coeur disponible
# Pour 100GB de données : 200-400 partitions (128-256MB par partition)
# Vérifier le nombre de partitions
print(df.rdd.getNumPartitions())
# Repartitionner (shuffle) : distribuer uniformément
df_balanced = df.repartition(200)
# Repartitionner sur une colonne (pour les jointures)
df_keyed = df.repartition(200, 'customer_id')
# Coalesce (pas de shuffle) : réduire le nombre de partitions
df_smaller = df.coalesce(50) # pour l écriture finale
# Configuration shuffle partitions
spark.conf.set('spark.sql.shuffle.partitions', '200')
# AQE ajuste automatiquement ce nombre depuis Spark 3.0Pourquoi le shuffle est-il l opération la plus coûteuse dans Spark ?
# Shuffle = transfert de données entre executors
# Cause : groupBy, join, distinct, repartition
# MAUVAIS : groupBy sur une colonne à haute cardinalité
df.groupBy('user_id').count() # beaucoup de shuffle
# MIEUX : filtrer avant de grouper
df.filter(df.date >= '2025-01-01').groupBy('region').count()
# Bucketing : pré-partitionner pour éviter le shuffle aux joins
df.write.bucketBy(50, 'customer_id').sortBy('customer_id')\
.saveAsTable('orders_bucketed')
df2.write.bucketBy(50, 'customer_id').sortBy('customer_id')\
.saveAsTable('customers_bucketed')
# Maintenant le join ne shuffle pas
spark.sql('SELECT * FROM orders_bucketed JOIN customers_bucketed USING (customer_id)')Qu est-ce qu un broadcast join ? Spark le fait-il automatiquement ?
from pyspark.sql.functions import broadcast
# Sans broadcast : Spark fait un sort-merge join (avec shuffle)
result = large_df.join(small_df, 'customer_id')
# Avec broadcast : small_df est envoyé à chaque executor
# Évite totalement le shuffle sur large_df
result = large_df.join(broadcast(small_df), 'customer_id')
# Automatique si small_df < spark.sql.autoBroadcastJoinThreshold
# Par défaut : 10MB. Augmenter :
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '100m')
# AQE (Adaptive Query Execution) peut convertir
# automatiquement un sort-merge join en broadcast join
# si une table s avère petite après filtrageQuand persistez-vous un DataFrame Spark ? Quelle est la différence entre cache() et persist() ?
from pyspark import StorageLevel
# cache() = MEMORY_AND_DISK
# Stocker en mémoire d abord, puis disque si nécessaire
df_filtered = df.filter(df.status == 'active').cache()
# persist() = contrôle granulaire
df.persist(StorageLevel.MEMORY_ONLY) # RAM uniquement
df.persist(StorageLevel.MEMORY_AND_DISK) # RAM + disque
df.persist(StorageLevel.DISK_ONLY) # disque uniquement
df.persist(StorageLevel.OFF_HEAP) # mémoire off-heap
# Quand cacher :
# 1. DataFrame réutilisé plusieurs fois dans le même job
# 2. Calcul intermédiaire coûteux
# Toujours unpersist après usage
df_filtered.unpersist()
# Vérifier ce qui est en cache
spark.catalog.isCached('ma_table')Pourquoi les Python UDFs sont-elles si lentes dans PySpark ? Quelles alternatives ?
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
# MAUVAIS : Python UDF classique (très lent)
from pyspark.sql.functions import udf
@udf('string')
def categorize(amount):
if amount > 1000: return 'high'
elif amount > 100: return 'medium'
else: return 'low'
# MIEUX : Pandas UDF (vectorisé via Apache Arrow)
@pandas_udf('string')
def categorize_vectorized(amount: pd.Series) -> pd.Series:
return pd.cut(amount, bins=[0, 100, 1000, float('inf')],
labels=['low', 'medium', 'high']).astype(str)
# MEILLEUR : fonctions natives Spark (pas d UDF du tout)
from pyspark.sql.functions import when
df.withColumn('tier',
when(col('amount') > 1000, 'high')
.when(col('amount') > 100, 'medium')
.otherwise('low')
)Comment utilisez-vous l UI Spark pour identifier les goulots d étranglement ?
| Niveau | Maitrise | Signal GO | NO-GO |
|---|---|---|---|
| Confirmé | Partitions basiques, broadcast joins, cache() | Ajuste les shuffle partitions, utilise broadcast sur les petites tables | Ne sait pas ce qu est une partition Spark |
| Senior | Bucketing, Pandas UDFs, Spark UI diagnostic, AQE | A résolu un data skew, remplace les Python UDFs par des Pandas UDFs | Utilise des Python UDFs sans savoir pourquoi c est lent |
Premier entretien gratuit. Rapport GO/NO-GO sous 48h.