PySpark is the Python API for Apache Spark — the dominant framework for large-scale data processing. This guide covers the core PySpark abstractions, DataFrame transformations, partitioning and performance tuning, reading from and writing to data lakes, and patterns for production PySpark jobs in analytics engineering contexts.
Why PySpark for Data Engineering
Apache Spark is the dominant framework for large-scale distributed data processing. When datasets exceed what a single machine can hold or process efficiently — typically in the tens of gigabytes to petabytes range — Spark distributes computation across a cluster, processing data in parallel partitions.
PySpark is the Python API for Spark. It provides a Pandas-like DataFrame API that runs distributed on a Spark cluster. For data engineers, PySpark enables: large-scale transformations that don't fit in memory, parallel ingestion from distributed sources, complex aggregations over multi-terabyte datasets, and integration with data lake storage (S3, GCS, ADLS) and open table formats (Iceberg, Delta Lake, Hudi).
Spark Architecture
Understanding the architecture helps you write efficient PySpark code:
**Driver**: The JVM process running your PySpark application. The driver maintains the SparkContext, creates execution plans, and coordinates task distribution. Runs on your local machine or a dedicated cluster node.
**Executor**: JVM processes running on worker nodes. Executors receive task assignments from the driver, process data partitions, and return results. Each executor manages a set of partitions in memory or spills to disk when memory is exhausted.
**Partitions**: The unit of parallelism. Spark divides DataFrames into partitions; each partition is processed by one task on one executor core. More partitions = more parallelism, up to the number of available cores.
**Lazy evaluation**: Spark builds a logical execution plan as you chain transformations. The plan does not execute until you call an action (count(), collect(), show(), write()). Lazy evaluation enables the Spark optimiser (Catalyst) to reorder, push down, and eliminate operations before execution.
**Shuffles**: Data redistribution across partitions — triggered by groupBy, join, repartition, distinct. Shuffles are expensive: they require writing data to disk and transferring across the network. Minimise shuffles for performance.
PySpark DataFrame API
Create a SparkSession: the entry point for all PySpark operations.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETLJob").getOrCreate()
**Reading data**:
- spark.read.parquet("s3://bucket/path/") — read Parquet files from S3
- spark.read.csv("path/", header=True, inferSchema=True) — read CSV with header
- spark.read.format("delta").load("path/") — read Delta Lake table
- spark.read.jdbc(url, table, properties=conn_props) — read from JDBC database
**DataFrame transformations** (lazy — build execution plan):
- df.select("col1", "col2") — column selection
- df.filter(df.status == "active") — row filtering
- df.withColumn("new_col", df.amount * 1.1) — add or replace column
- df.groupBy("region").agg({"amount": "sum", "id": "count"}) — aggregation
- df.join(other_df, "customer_id", "left") — join
- df.dropDuplicates(["order_id"]) — deduplication
**Actions** (trigger execution):
- df.count() — count rows
- df.show(20) — display first 20 rows
- df.collect() — return all rows to driver (dangerous on large datasets)
- df.write.parquet("s3://output/path/") — write output
Column Operations with Functions
PySpark provides a rich library of functions in pyspark.sql.functions:
from pyspark.sql import functions as F
- F.col("column_name") — column reference
- F.lit(42) — literal value
- F.when(condition, value).otherwise(other_value) — conditional
- F.coalesce(F.col("a"), F.col("b")) — first non-null
- F.date_trunc("month", F.col("event_date")) — date truncation
- F.regexp_extract(F.col("url"), pattern, 1) — regex extraction
- F.explode(F.col("array_column")) — flatten array column to rows
- F.to_json(F.col("struct_column")) — serialise struct to JSON string
Window functions for analytical computations:
from pyspark.sql import Window
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
df.withColumn("row_num", F.row_number().over(window_spec))
df.withColumn("running_total", F.sum("amount").over(window_spec))
df.withColumn("prev_order_date", F.lag("order_date", 1).over(window_spec))
Schema Management
Spark infers schemas when reading CSV and JSON, but explicit schemas are safer for production jobs — inferred schemas can change if source data changes.
Define schemas with StructType:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType
schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("amount", DecimalType(18, 2), nullable=True),
StructField("order_date", TimestampType(), nullable=False),
])
df = spark.read.schema(schema).parquet("s3://bucket/orders/")
Explicit schemas also document the expected structure of source data — check schema against expectation at job start to fail fast on structural changes.
Partitioning and Performance
**Partition count**: As a rule of thumb, target 128MB-256MB per partition. Too few partitions underutilise the cluster; too many create scheduling overhead. Check with df.rdd.getNumPartitions().
**Repartition vs coalesce**: repartition(n) redistributes data across n partitions with a shuffle — use to increase partition count or to partition by a column. coalesce(n) reduces partition count without a full shuffle — use to reduce small files before writing.
**Partition by column on write**: df.write.partitionBy("date", "region").parquet("s3://output/") — writes data into directory partitions by column values, enabling partition pruning on subsequent reads.
**Broadcast joins**: For joins where one DataFrame is small (under a few hundred MB), use broadcast to avoid shuffle. F.broadcast(small_df) hints Spark to replicate the small DataFrame to all executors, enabling a map-side join. Broadcast joins eliminate the most expensive shuffle pattern.
**Caching**: df.cache() stores a DataFrame in executor memory for repeated use. Only cache if you access the same DataFrame multiple times in the same job. Call df.unpersist() when done to release memory.
**Predicate pushdown**: Spark pushes filter conditions down to the data source when possible — Parquet readers skip row groups that don't match filters, JDBC sources receive WHERE clauses. Ensure filters are on partition columns or indexed columns to maximise pushdown benefit.
Reading from and Writing to Data Lakes
**Reading Parquet from S3/GCS/ADLS**: spark.read.parquet("s3a://bucket/path/"). Configure S3 credentials via Hadoop configuration properties or IAM instance roles. Use partition pruning with filter conditions on partition columns.
**Writing with schema evolution**: Delta Lake and Iceberg handle schema evolution. For Parquet writes, manage schema changes manually — adding columns is backward compatible, removing or changing types requires reader coordination.
**Iceberg integration**: spark.read.format("iceberg").load("glue_catalog.database.table"). Iceberg tables support ACID operations from Spark: df.writeTo("catalog.db.table").overwritePartitions() for partition-level atomic replacement.
**Delta Lake**: df.write.format("delta").mode("overwrite").save("s3://path/"). DeltaTable.forPath(spark, path).merge(source_df, merge_condition).whenMatchedUpdate().whenNotMatchedInsert().execute() for UPSERT operations.
PySpark in Production
**Job submission**: submit with spark-submit, passing JAR dependencies, Python files, and configuration properties. On Databricks or EMR, submit via job APIs or notebook cells.
**Configuration**: Set spark.conf.set("spark.sql.shuffle.partitions", "200") — the default 200 shuffle partitions is too low for large datasets and too high for small ones. Tune based on data volume.
**Resource allocation**: executor-memory, executor-cores, num-executors determine cluster utilisation. On YARN or Kubernetes, tune to match cluster capacity without over-allocating.
**Monitoring**: Spark UI (typically port 4040) shows running stages, task timelines, shuffle read/write volumes, and GC overhead. The SQL tab shows query plans with time spent in each operator — use this to identify where time is spent.
**Error handling**: Wrap job logic in try/except to catch and log failures. Use df.write in append mode with deduplication downstream rather than overwrite mode — partial writes don't leave tables in undefined state.
Our data architecture practice designs and builds PySpark data pipelines for enterprise analytics at scale — contact us to discuss your data engineering requirements.
A former Microsoft data architect audits your data foundation, identifies your top priorities, and sends you a written plan. Free. No pitch.
Book a Call →