(+351) 21 24 10006  ·  info@bconcepts.pt
Carnaxide, Lisboa
Azure Data Factory
Azure Data Factory 1 min

Padrões de ingestão incremental no Azure Data Factory com Watermark

João Barros 16 de September de 2024 1 min de leitura

Carregar toda uma tabela fonte em cada execução do pipeline é ineficiente e não escala. O padrão Watermark no Azure Data Factory usa uma tabela de controlo para rastrear o estado da última ingestão e copiar apenas os registos novos ou alterados.

Tabela de controlo (Watermark)

-- Na base de dados de controlo (Azure SQL)
CREATE TABLE dbo.watermark_control (
    table_name      VARCHAR(200) PRIMARY KEY,
    last_watermark  DATETIME2   NOT NULL DEFAULT '1900-01-01'
);

INSERT INTO dbo.watermark_control VALUES ('dbo.Vendas', '1900-01-01');

Estrutura do pipeline ADF

Pipeline: PL_Incremental_Vendas

1. Lookup (ler watermark actual)
   Query: SELECT last_watermark FROM watermark_control
          WHERE table_name = 'dbo.Vendas'

2. Copy Data (copiar novos registos)
   Source (SQL Server):
     Query: SELECT * FROM dbo.Vendas
            WHERE updated_at > '@{activity('Lookup_Watermark').output.firstRow.last_watermark}'
             AND updated_at <= '@{pipeline().TriggerTime}'
   Sink: ADLS Gen2 / Lakehouse (Parquet / Delta)

3. Stored Procedure (actualizar watermark)
   Procedure: usp_update_watermark
   Parameters: @table_name = 'dbo.Vendas',
               @new_watermark = @{pipeline().TriggerTime}

Stored Procedure de actualização

CREATE PROCEDURE dbo.usp_update_watermark
    @table_name  VARCHAR(200),
    @new_wm      DATETIME2
AS
UPDATE dbo.watermark_control
SET    last_watermark = @new_wm
WHERE  table_name = @table_name;

Parametrizar para múltiplas tabelas

// Pipeline ForEach para processar N tabelas:
1. Lookup → SELECT table_name, source_query FROM config.tables_to_ingest
2. ForEach (items: @{activity('Lookup_Tables').output.value})
   → Execute Pipeline: PL_Incremental_Generic
     Parameters: table_name = @{item().table_name}

Conclusão

O padrão Watermark é o alicerce da ingestão incremental. Com uma tabela de controlo simples e um pipeline parametrizado, é possível gerir dezenas de tabelas com o mesmo pipeline genérico, reduzindo o volume de dados copiados e o tempo de execução em 90%+.

Partilhar: