Incremental ingestion patterns in Azure Data Factory with Watermark
João Barros
16 de September de 2024
1 min read
Loading an entire source table on every pipeline run is inefficient and does not scale. The Watermark pattern in Azure Data Factory uses a control table to track the state of the last ingestion and copy only new or changed records.
Control table (Watermark)
-- In the control database (Azure SQL)
CREATE TABLE dbo.watermark_control (
table_name VARCHAR(200) PRIMARY KEY,
last_watermark DATETIME2 NOT NULL DEFAULT '1900-01-01'
);
INSERT INTO dbo.watermark_control VALUES ('dbo.Sales', '1900-01-01');
ADF pipeline structure
Pipeline: PL_Incremental_Sales
1. Lookup (read the current watermark)
Query: SELECT last_watermark FROM watermark_control
WHERE table_name = 'dbo.Sales'
2. Copy Data (copy new records)
Source (SQL Server):
Query: SELECT * FROM dbo.Sales
WHERE updated_at > '@{activity('Lookup_Watermark').output.firstRow.last_watermark}'
AND updated_at <= '@{pipeline().TriggerTime}'
Sink: ADLS Gen2 / Lakehouse (Parquet / Delta)
3. Stored Procedure (update the watermark)
Procedure: usp_update_watermark
Parameters: @table_name = 'dbo.Sales',
@new_watermark = @{pipeline().TriggerTime}
Update Stored Procedure
CREATE PROCEDURE dbo.usp_update_watermark
@table_name VARCHAR(200),
@new_wm DATETIME2
AS
UPDATE dbo.watermark_control
SET last_watermark = @new_wm
WHERE table_name = @table_name;
Parameterize for multiple tables
// ForEach pipeline to process N tables:
1. Lookup → SELECT table_name, source_query FROM config.tables_to_ingest
2. ForEach (items: @{activity('Lookup_Tables').output.value})
→ Execute Pipeline: PL_Incremental_Generic
Parameters: table_name = @{item().table_name}
Conclusion
The Watermark pattern is the foundation of incremental ingestion. With a simple control table and a parameterized pipeline, you can manage dozens of tables with the same generic pipeline, reducing the volume of copied data and execution time by 90%+.