BlogData Architecture

Apache Hudi: Streaming Upserts and Incremental Processing on the Data Lake

James Okafor
James Okafor
Lead Data Engineer
·December 1, 202712 min read

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is an open table format that adds record-level upsert and delete capabilities to Parquet files in cloud storage. Originally built at Uber for near-real-time CDC ingestion at scale, Hudi is particularly well-suited to streaming ingestion pipelines that need to upsert millions of records into large analytical tables without full partition rewrites.

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is an open table format for storing large analytical datasets in cloud object storage with native support for record-level upserts, deletes, and incremental queries. Originally developed at Uber to handle near-real-time CDC ingestion from hundreds of upstream databases into their analytical platform, Hudi was designed specifically for the high-throughput upsert workloads that are common in streaming data ingestion pipelines.

Hudi vs Iceberg vs Delta Lake

All three open table formats (Hudi, Iceberg, Delta Lake) add ACID semantics and schema evolution to object storage. The key differentiator for Hudi is its optimisation for upsert-heavy workloads at the expense of some query planning flexibility:

**Hudi's primary advantage** — MOR (Merge-On-Read) table type allows extremely fast ingestion of upserts: incoming records are written directly to a delta log without merging into the base Parquet files. Merging happens at query time (or during compaction). For CDC pipelines ingesting millions of updates per hour, this enables low-latency landing of changes.

**Hudi's relative weakness** — query planning is more complex than Iceberg's file-statistics-based approach. Iceberg's manifest files with column statistics enable fine-grained file pruning; Hudi's query planning relies more on partition pruning and file group structures.

**When to choose Hudi** — streaming CDC ingestion pipelines where upsert throughput is the primary concern; when near-real-time landing latency (minutes) matters more than ad-hoc query planning sophistication.

Hudi Table Types

Hudi supports two table types with different read/write trade-offs:

**COW (Copy-On-Write)** — on every upsert, Hudi rewrites the affected Parquet files entirely with the updated records merged in. COW tables always provide the most current data to readers without any merge overhead. The write cost is higher (full file rewrite per affected file); the read cost is lower (pure Parquet, no log file merging).

Appropriate for: tables with low update rates, or where read query performance is more important than write throughput.

**MOR (Merge-On-Read)** — incoming upserts are written to a row-based delta log file (Avro format) appended to the existing Parquet base files. Readers merge the base files and delta log files at query time to present the current state. Periodically, compaction merges the delta logs into new base Parquet files.

Appropriate for: high-throughput CDC ingestion where write latency matters; tables where the update rate is high relative to query frequency.

The compaction process for MOR tables is critical: without regular compaction, delta log files accumulate and read performance degrades. Compaction should be scheduled to run regularly, with frequency calibrated to the ingestion rate and acceptable query overhead.

Hudi Table Structure

A Hudi table on object storage contains:

- **Base files** — Parquet files with stable row data

- **Delta log files** — Avro-encoded row-level changes for MOR tables

- **.hoodie/ directory** — Hudi timeline metadata: commit files, compaction plans, rollback files, and cleaning plans

The Hudi timeline is the core metadata layer — every operation (commit, compaction, rollback, clean) creates a timeline entry. The timeline enables time travel, incremental queries, and rollback operations.

Incremental Queries

One of Hudi's distinctive features is incremental query support. An incremental query returns only the rows that have changed since a specified point in the Hudi timeline:

spark.read.format("hudi")

.option("hoodie.datasource.query.type", "incremental")

.option("hoodie.datasource.read.begin.instanttime", "20240101000000")

.load(base_path)

This returns only rows that were inserted, updated, or deleted after the specified timeline instant. For downstream transformation pipelines that process only new changes, incremental queries eliminate the need to scan the full table — a significant performance advantage for large tables with high change rates.

Incremental queries are the mechanism that enables Hudi-based CDC pipelines to propagate changes through multiple processing layers efficiently: each layer processes only the incremental changes from the previous layer, not the full table.

Hudi with Spark

Hudi's primary compute integration is Apache Spark. Writing to a Hudi table with Spark:

df.write.format("hudi")

.option("hoodie.table.name", "orders")

.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")

.option("hoodie.datasource.write.operation", "upsert")

.option("hoodie.datasource.write.recordkey.field", "order_id")

.option("hoodie.datasource.write.precombine.field", "updated_at")

.option("hoodie.datasource.write.partitionpath.field", "order_date")

.mode("append")

.save(base_path)

Key options:

- **recordkey.field** — the field that uniquely identifies a row. Hudi uses this to detect which rows to update vs insert.

- **precombine.field** — for deduplication when multiple records arrive for the same key in the same batch, the record with the highest precombine value is kept (typically a timestamp).

- **partitionpath.field** — the field used for Hive-compatible partitioning.

Hudi in AWS

Hudi has particularly strong AWS integration:

**EMR (Elastic MapReduce)** — EMR ships with Hudi pre-installed. Hudi tables on EMR write to S3.

**AWS Glue** — AWS Glue has native Hudi support. Glue jobs can read and write Hudi tables, and the Glue Data Catalog is the metastore for Hudi table metadata.

**Athena** — Athena supports querying COW Hudi tables and, for MOR tables, supports Snapshot queries (current state) via the Hive metastore integration.

**DMS (Database Migration Service)** — AWS DMS can write CDC changes directly to S3 as Hudi format, enabling managed CDC pipelines without self-hosted Debezium.

Operational Considerations

**Compaction management (MOR)** — for MOR tables, schedule compaction to run regularly. Hudi provides inline compaction (during write operations) and asynchronous compaction (separate job). For production CDC pipelines with high write rates, asynchronous compaction on a schedule (hourly or more frequently) maintains query performance.

**Table services (cleaning)** — Hudi's cleaning service removes old file versions beyond the configured retention. Without cleaning, old Parquet base files and delta log files accumulate and consume increasing storage. Configure clean-keep-commits appropriately for your retention requirements.

**Timeline management** — the Hudi timeline grows continuously. Archiving moves old timeline metadata to the archive, keeping the active timeline small and metadata operations fast.

Our data architecture practice designs Hudi-based CDC ingestion architectures for enterprise data teams — contact us to discuss streaming ingestion strategy for your data lake.

Get your data architecture audit in 30 minutes.

A former Microsoft data architect audits your data foundation, identifies your top priorities, and sends you a written plan. Free. No pitch.

Book a Call →