(+351) 21 24 10006  ·  info@bconcepts.pt
Carnaxide, Lisboa
Azure Data Factory
Azure Data Factory 1 min

Patrones de ingesta incremental en Azure Data Factory con Watermark

João Barros 16 de September de 2024 1 min de lectura

Cargar toda una tabla origen en cada ejecución del pipeline es ineficiente y no escala. El patrón Watermark en Azure Data Factory usa una tabla de control para rastrear el estado de la última ingesta y copiar solo los registros nuevos o modificados.

Tabla de control (Watermark)

-- En la base de datos de control (Azure SQL)
CREATE TABLE dbo.watermark_control (
    table_name      VARCHAR(200) PRIMARY KEY,
    last_watermark  DATETIME2   NOT NULL DEFAULT '1900-01-01'
);

INSERT INTO dbo.watermark_control VALUES ('dbo.Ventas', '1900-01-01');

Estructura del pipeline ADF

Pipeline: PL_Incremental_Ventas

1. Lookup (leer el watermark actual)
   Query: SELECT last_watermark FROM watermark_control
          WHERE table_name = 'dbo.Ventas'

2. Copy Data (copiar nuevos registros)
   Source (SQL Server):
     Query: SELECT * FROM dbo.Ventas
            WHERE updated_at > '@{activity('Lookup_Watermark').output.firstRow.last_watermark}'
             AND updated_at <= '@{pipeline().TriggerTime}'
   Sink: ADLS Gen2 / Lakehouse (Parquet / Delta)

3. Stored Procedure (actualizar el watermark)
   Procedure: usp_update_watermark
   Parameters: @table_name = 'dbo.Ventas',
               @new_watermark = @{pipeline().TriggerTime}

Stored Procedure de actualización

CREATE PROCEDURE dbo.usp_update_watermark
    @table_name  VARCHAR(200),
    @new_wm      DATETIME2
AS
UPDATE dbo.watermark_control
SET    last_watermark = @new_wm
WHERE  table_name = @table_name;

Parametrizar para múltiples tablas

// Pipeline ForEach para procesar N tablas:
1. Lookup → SELECT table_name, source_query FROM config.tables_to_ingest
2. ForEach (items: @{activity('Lookup_Tables').output.value})
   → Execute Pipeline: PL_Incremental_Generic
     Parameters: table_name = @{item().table_name}

Conclusión

El patrón Watermark es la base de la ingesta incremental. Con una tabla de control simple y un pipeline parametrizado, es posible gestionar decenas de tablas con el mismo pipeline genérico, reduciendo el volumen de datos copiados y el tiempo de ejecución en un 90%+.

Compartir: