Padrões de ingestão incremental no Azure Data Factory com Watermark
João Barros
16 de September de 2024
1 min de leitura
Carregar toda uma tabela fonte em cada execução do pipeline é ineficiente e não escala. O padrão Watermark no Azure Data Factory usa uma tabela de controlo para rastrear o estado da última ingestão e copiar apenas os registos novos ou alterados.
Tabela de controlo (Watermark)
-- Na base de dados de controlo (Azure SQL)
CREATE TABLE dbo.watermark_control (
table_name VARCHAR(200) PRIMARY KEY,
last_watermark DATETIME2 NOT NULL DEFAULT '1900-01-01'
);
INSERT INTO dbo.watermark_control VALUES ('dbo.Vendas', '1900-01-01');
Estrutura do pipeline ADF
Pipeline: PL_Incremental_Vendas
1. Lookup (ler watermark actual)
Query: SELECT last_watermark FROM watermark_control
WHERE table_name = 'dbo.Vendas'
2. Copy Data (copiar novos registos)
Source (SQL Server):
Query: SELECT * FROM dbo.Vendas
WHERE updated_at > '@{activity('Lookup_Watermark').output.firstRow.last_watermark}'
AND updated_at <= '@{pipeline().TriggerTime}'
Sink: ADLS Gen2 / Lakehouse (Parquet / Delta)
3. Stored Procedure (actualizar watermark)
Procedure: usp_update_watermark
Parameters: @table_name = 'dbo.Vendas',
@new_watermark = @{pipeline().TriggerTime}
Stored Procedure de actualização
CREATE PROCEDURE dbo.usp_update_watermark
@table_name VARCHAR(200),
@new_wm DATETIME2
AS
UPDATE dbo.watermark_control
SET last_watermark = @new_wm
WHERE table_name = @table_name;
Parametrizar para múltiplas tabelas
// Pipeline ForEach para processar N tabelas:
1. Lookup → SELECT table_name, source_query FROM config.tables_to_ingest
2. ForEach (items: @{activity('Lookup_Tables').output.value})
→ Execute Pipeline: PL_Incremental_Generic
Parameters: table_name = @{item().table_name}
Conclusão
O padrão Watermark é o alicerce da ingestão incremental. Com uma tabela de controlo simples e um pipeline parametrizado, é possível gerir dezenas de tabelas com o mesmo pipeline genérico, reduzindo o volume de dados copiados e o tempo de execução em 90%+.