(+351) 21 24 10006  ·  info@bconcepts.pt
Carnaxide, Lisbon
Azure Data Factory
Azure Data Factory 1 min

Incremental ingestion patterns in Azure Data Factory with Watermark

João Barros 16 de September de 2024 1 min read

Loading an entire source table on every pipeline run is inefficient and does not scale. The Watermark pattern in Azure Data Factory uses a control table to track the state of the last ingestion and copy only new or changed records.

Control table (Watermark)

-- In the control database (Azure SQL)
CREATE TABLE dbo.watermark_control (
    table_name      VARCHAR(200) PRIMARY KEY,
    last_watermark  DATETIME2   NOT NULL DEFAULT '1900-01-01'
);

INSERT INTO dbo.watermark_control VALUES ('dbo.Sales', '1900-01-01');

ADF pipeline structure

Pipeline: PL_Incremental_Sales

1. Lookup (read the current watermark)
   Query: SELECT last_watermark FROM watermark_control
          WHERE table_name = 'dbo.Sales'

2. Copy Data (copy new records)
   Source (SQL Server):
     Query: SELECT * FROM dbo.Sales
            WHERE updated_at > '@{activity('Lookup_Watermark').output.firstRow.last_watermark}'
             AND updated_at <= '@{pipeline().TriggerTime}'
   Sink: ADLS Gen2 / Lakehouse (Parquet / Delta)

3. Stored Procedure (update the watermark)
   Procedure: usp_update_watermark
   Parameters: @table_name = 'dbo.Sales',
               @new_watermark = @{pipeline().TriggerTime}

Update Stored Procedure

CREATE PROCEDURE dbo.usp_update_watermark
    @table_name  VARCHAR(200),
    @new_wm      DATETIME2
AS
UPDATE dbo.watermark_control
SET    last_watermark = @new_wm
WHERE  table_name = @table_name;

Parameterize for multiple tables

// ForEach pipeline to process N tables:
1. Lookup → SELECT table_name, source_query FROM config.tables_to_ingest
2. ForEach (items: @{activity('Lookup_Tables').output.value})
   → Execute Pipeline: PL_Incremental_Generic
     Parameters: table_name = @{item().table_name}

Conclusion

The Watermark pattern is the foundation of incremental ingestion. With a simple control table and a parameterized pipeline, you can manage dozens of tables with the same generic pipeline, reducing the volume of copied data and execution time by 90%+.

Share: