BlogData Engineering

Apache Spark Performance Optimisation: Partitioning, Caching, and Shuffle Management

James Okafor
James Okafor
Data & Cloud Engineer
·August 12, 202611 min read

Most Spark performance problems come from the same root causes: data skew, too many small files, unnecessary shuffles, and missing broadcast hints. This guide covers the optimisation techniques that address each.

Most Spark performance problems come from the same root causes: data skew, too many small files, unnecessary shuffle operations, and poor partition configuration. This guide covers the optimisation techniques for each, and the monitoring approach for diagnosing which problem you have before applying a fix.

Understanding Spark's execution model

Spark breaks jobs into stages separated by shuffle operations. Within a stage, tasks execute in parallel — one task per partition. Each task processes a subset of the data.

The critical performance factors:

- **Number of tasks** (controlled by partition count): Too few tasks means some executors sit idle. Too many tasks means each task does trivial work with high overhead.

- **Task duration**: Ideal task duration is 1–30 seconds. Tasks under 100ms have high scheduling overhead. Tasks over 5 minutes create long stragglers.

- **Shuffle volume**: Cross-stage data movement is expensive. Reduce it.

- **Skew**: If one partition has 10x the data of others, the slowest task determines the stage's duration.

The Spark UI (available at the driver's 4040 port) shows stage timelines, task duration distributions, shuffle volumes, and executor utilisation. Read the Spark UI before optimising — do not guess.

Partition tuning

The default partition count for a shuffle operation is 200 (spark.sql.shuffle.partitions). For small datasets, 200 partitions means 200 tiny tasks — high overhead, slow overall. For large datasets, 200 partitions means large tasks — memory pressure and serialisation overhead.

The target: partition sizes of 100–200MB after shuffle. Set spark.sql.shuffle.partitions = (estimated shuffle data volume in bytes) / (100MB * 1,000,000). For a shuffle of 10GB, 100 partitions is reasonable. For a shuffle of 100GB, 1,000 partitions.

**Adaptive Query Execution (AQE)**: Spark 3.0+ includes AQE, which automatically adjusts partition count during execution based on actual shuffle output sizes. Enable it: spark.sql.adaptive.enabled = true. AQE also handles skew automatically. Use AQE for most workloads rather than manually tuning partition counts.

**Source data partitioning**: When reading from Parquet or Delta on object storage, the number of input tasks equals the number of files (for small files) or file chunks (for large files). Many small files (the "small file problem") means many tiny tasks. Either compact files before reading, or use the spark.sql.files.maxPartitionBytes setting to coalesce small file reads into larger partitions.

Broadcast joins

A join between a large table and a small table is expensive by default because Spark shuffles both sides. The small table can instead be broadcast to all executors — each executor holds the entire small table in memory and performs the join locally without shuffle.

Automatic broadcast: Spark broadcasts tables under spark.sql.autoBroadcastJoinThreshold (default 10MB). Tables smaller than this threshold are automatically broadcast.

Manual broadcast hint: For tables slightly above the threshold or when Spark's size estimate is wrong, add a broadcast hint in PySpark. This forces the broadcast regardless of the table size estimate.

For joins between large tables, broadcast is not applicable — neither side can fit in executor memory. Use sort-merge join (the default) and ensure both sides are shuffled on the same key in previous operations if possible (to avoid re-shuffling).

Handling data skew

Skew occurs when one partition has significantly more data than others. A join on customer_id where one customer has 1 million orders while all others have fewer than 100 will produce one giant partition and many tiny ones. The giant partition becomes a straggler — the stage completes when the last task completes, and the last task is always the skewed one.

**AQE skew handling**: With AQE enabled, Spark 3.0+ automatically detects and splits skewed partitions. Enabled by default when AQE is on. Check spark.sql.adaptive.skewJoin.enabled.

**Salting**: For joins with extreme skew that AQE does not handle adequately, add a random salt column to the skewed key before joining — distributing what was one logical key across N partitions. Requires duplicating the other join side N times (one copy per salt value). A 10x reduction in skew requires 10x more data on the non-skewed side, so use this judiciously.

**Filtering before joining**: If the skewed data is caused by a specific subset (one customer, one event type), filter it out before the join, process it separately, and union the results.

Caching and persistence

Spark recomputes lineage from scratch on every action unless the DataFrame is explicitly cached. For DataFrames that are used multiple times in a job (a complex intermediate result used in multiple downstream aggregations), caching avoids recomputation.

Cache in memory (MEMORY_ONLY): Fastest, but if the cached data does not fit in memory, Spark spills to disk or recomputes. Appropriate for DataFrames that fit comfortably in executor memory.

Cache to disk (MEMORY_AND_DISK): Falls back to disk when memory is insufficient. Slower than MEMORY_ONLY but prevents recomputation when memory is constrained.

Cache with serialisation (MEMORY_ONLY_SER): Stores objects as serialised byte arrays — uses less memory than deserialized objects but requires CPU for serialisation/deserialisation on access.

**When to cache**: Only cache DataFrames that are reused in the same application. Caching a DataFrame that is used only once wastes memory and adds serialisation overhead. Cache after expensive operations (large joins, aggregations) that would be expensive to recompute.

**Unpersist**: Always call unpersist() when the cached DataFrame is no longer needed. Spark does not automatically evict cache when memory is needed — LRU eviction only happens when new cache requests cannot fit.

Avoiding wide transformations

Every groupBy, join, and distinct triggers a shuffle. Rearranging your transformation logic to minimise shuffles is often the highest-ROI optimisation:

- Filter and select only needed columns before a join — reduce data volume going into the shuffle

- Combine multiple groupBy operations on the same key into a single groupBy with multiple aggregations

- Avoid sorting unless required by the downstream operation — sort triggers a shuffle and is often unnecessary

Memory management

Spark splits executor memory into execution (for shuffles and joins) and storage (for caching). By default, both share a unified pool. Increase executor memory (spark.executor.memory) for memory-intensive workloads. For workloads that regularly spill to disk during shuffles, increase the shuffle partition count to reduce per-partition data volume rather than simply adding memory.

For the PySpark programming foundations, see what is apache spark. For the Databricks implementation context, see delta live tables. Our cloud engineering practice optimises Spark workloads across Databricks, EMR, and Dataproc — book a free performance review if your Spark jobs are running slower than expected.

Get your data architecture audit in 30 minutes.

A former Microsoft data architect audits your data foundation, identifies your top priorities, and sends you a written plan. Free. No pitch.

Book a Call →