Learn how to create and deploy an ETL (extract, transform, and load) pipeline with change data capture (CDC) using Lakeflow Declarative Pipelines for data orchestration and Auto Loader. An ETL pipeline implements the steps to read data from source systems, transform that data based on requirements, such as data quality checks and record de-duplication, and write the data to a target system, such as a data warehouse or a data lake.
In this tutorial, you'll use data from a customers
table in a MySQL database to:
customers_cdc
table. Auto Loader will infer the schema and handle schema evolution.customers_cdc_clean
to check the data quality using expectations. For example, the id
should never be null
as you will use it to run your upsert operations.AUTO CDC ... INTO
(doing the upserts) on the cleaned CDC data to apply the changes to the final customers
tableThe goal is to ingest the raw data in near real time and build a table for your analyst team while ensuring data quality.
The tutorial uses the medallion Lakehouse architecture, where it ingests raw data through the bronze layer, cleans and validates data with the silver layer, and applies dimensional modeling and aggregation using the gold layer. See What is the medallion lakehouse architecture? for more information.
The flow you will implement looks like this:
For more information about Lakeflow Declarative Pipelines, Auto Loader, and CDC see Lakeflow Declarative Pipelines, What is Auto Loader?, and What is change data capture (CDC)?
RequirementsâTo complete this tutorial, you must meet the following requirements:
ALL PRIVILEGES
or USE CATALOG
and CREATE SCHEMA
.ALL PRIVILEGES
or USE SCHEMA
and CREATE VOLUME
.Change data capture (CDC) is the process that captures the changes in records made to a transactional database (for example, MySQL or PostgreSQL) or a Data Warehouse. CDC captures operations like data deletion, append, and updating, typically as a stream to re-materialize the table in external systems. CDC enables incremental loading while eliminating the need for bulk load updating.
note
To simplify the tutorial, skip setting up an external CDC system. You can consider it up and running and saving the CDC data as JSON files in a blob storage (S3, ADLS, GCS).
Capturing CDCâA variety of CDC tools are available. One of the open source leader solutions is Debezium, but other implementations that simplify the data source exist, such as Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate, and AWS DMS.
In this tutorial, you use CDC data from an external system like Debezium or DMS. Debezium captures every changed row. It typically sends the history of data changes to Kafka logs or saves them as a file.
You must ingest the CDC information from the customers
table (JSON format), check that it is correct, and then materialize the customer table in the Lakehouse.
For each change, you will receive a JSON message containing all the fields of the row being updated (id
, firstname
, lastname
, email
, address
). In addition, you will have extra metadata information, including:
operation
: An operation code, typically (DELETE
, APPEND
, UPDATE
).operation_date
: The date and timestamp for the record for each operation action.Tools like Debezium can produce more advanced output, such as the row value before the change, but this tutorial omits them for simplicity.
Step 0: Setup of tutorial dataâFirst, you must create a new notebook and install the demo files used in this tutorial into your workspace.
Click New in upper-left corner.
Click Notebook.
Change the title of the notebook from Untitled Notebook <date and time> to Pipelines tutorial setup.
Next to your notebook's title at the top, set the notebook's default language to Python.
Open the notebook environment panel, by clicking (Environment) in the right sidebar of the notebook editor.
This notebook uses faker
to create fake data for an example. Add faker
as a dependency of the notebook, and then click Apply at the bottom of the environment panel.
To generate the dataset used in the tutorial, enter the following code in the first cell and type Shift + Enter to run the code:
Python
catalog = "main"
schema = dbName = db = "dbdemos_dlt_cdc"
volume_name = "raw_data"
spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
spark.sql(f'USE CATALOG `{catalog}`')
spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
spark.sql(f'USE SCHEMA `{schema}`')
spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"
try:
dbutils.fs.ls(volume_folder+"/customers")
except:
print(f"folder doesn't exists, generating the data under {volume_folder}...")
from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
import uuid
fake = Faker()
import random
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
fake_address = F.udf(fake.address)
operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
df = spark.range(0, 100000).repartition(100)
df = df.withColumn("id", fake_id())
df = df.withColumn("firstname", fake_firstname())
df = df.withColumn("lastname", fake_lastname())
df = df.withColumn("email", fake_email())
df = df.withColumn("address", fake_address())
df = df.withColumn("operation", fake_operation())
df_customers = df.withColumn("operation_date", fake_date())
df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
To preview the data used in this tutorial, enter this code in the next cell and type Shift + Enter to run the code. Remember that if you changed the path in the previous cell, you will need to make the equivalent changes here.
Python
display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
First, you will create an ETL pipeline in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines creates pipelines by resolving dependencies defined in notebooks or files (called source code) using Lakeflow Declarative Pipelines syntax. Each source code file can contain only one language, but you can add multiple language-specific notebooks or files in the pipeline. To learn more, see Lakeflow Declarative Pipelines
important
Leave the Source code field blank to automatically create and configure a notebook for source code authoring.
This tutorial uses serverless compute and Unity Catalog. For all configuration options that are not specified, use the default settings. If serverless compute is not enabled or supported in your workspace, you can complete the tutorial as written using default compute settings. If you use default compute settings, you must manually select Unity Catalog under Storage options in the Destination section of the Create pipeline UI.
To create a new ETL pipeline in Lakeflow Declarative Pipelines, follow these steps:
The pipeline UI appears for the new pipeline.
A blank source code notebook is automatically created and configured for the pipeline. The notebook is created in a new directory in your user directory. The name of the new directory and file match the name of your pipeline. For example, /Users/someone@example.com/my_pipeline/my_pipeline
.
important
Notebooks can only contain a single programming language. Do not mix Python and SQL code in pipeline source code notebooks.
When developing Lakeflow Declarative Pipelines, you can choose either Python or SQL. This tutorial includes examples for both languages. Based on your language choice, check that you select the default notebook language.
To learn more about notebook support for Lakeflow Declarative Pipelines code development, see Develop and debug ETL pipelines with a notebook in Lakeflow Declarative Pipelines.
Step 2: Incrementally ingest data with Auto LoaderâThe first step is to ingest the raw data from the cloud storage into a bronze layer.
This can be challenging for multiple reasons, as you must:
Auto Loader simplify this ingestion, including schema inference and schema evolution, while scaling to millions of incoming files. Auto Loader is available in Python using cloudFiles
and in SQL using the SELECT * FROM STREAM read_files(...)
and can be used with a variety of formats (JSON, CSV, Apache Avro, etc.):
Defining the table as a streaming table will guarantee that you only consume new incoming data. If you do not define it as a streaming table, you will scan and ingest all the available data. See Streaming tables for more information.
To ingest the incoming data using Auto Loader, copy and paste the following code into the first cell in the notebook. You can use Python or SQL, depending on the notebook's default language you chose in the previous step.
Python
from dlt import *
from pyspark.sql.functions import *
dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
@append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_bronze_ingest_flow AS
INSERT INTO customers_cdc_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
format => "json",
inferColumnTypes => "true"
)
Click Start to start an update for the connected pipeline.
After the bronze layer is defined, you will create the silver layers by adding expectations to control the data quality by checking the following conditions:
null
.json
must have been read adequately by Auto Loader.The row will be dropped if one of these conditions isn't respected.
See Manage data quality with pipeline expectations for more information.
Click Edit and Insert cell below to insert a new empty cell.
To create a silver layer with a cleansed table and impose constraints, copy and paste the following code into the new cell in the notebook.
Python
dlt.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)
@append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
dlt.read_stream("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
)
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_cdc_clean_flow AS
INSERT INTO customers_cdc_clean BY NAME
SELECT * FROM STREAM customers_cdc_bronze;
Click Start to start an update for the connected pipeline.
The customers
table will contain the most up-to-date view and be a replica of the original table.
This is nontrivial to implement manually. You must consider things like data deduplication to keep the most recent row.
However, Lakeflow Declarative Pipelines solves these challenges with the AUTO CDC
operation.
Click Edit and Insert cell below to insert a new empty cell.
To process the CDC data using AUTO CDC
in Lakeflow Declarative Pipelines, copy and paste the following code into the new cell in the notebook.
Python
dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")
dlt.create_auto_cdc_flow(
target="customers",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
)
SQL
CREATE OR REFRESH STREAMING TABLE customers;
CREATE FLOW customers_cdc_flow
AS AUTO CDC INTO customers
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 1;
Click Start to start an update for the connected pipeline.
It's often required to create a table tracking all the changes resulting from APPEND
, UPDATE
, and DELETE
:
Delta supports change data flow (CDF), and table_change
can query the table modification in SQL and Python. However, CDF's main use case is to capture changes in a pipeline and not create a full view of the table changes from the beginning.
Things get especially complex to implement if you have out-of-order events. If you must sequence your changes by a timestamp and receive a modification that happened in the past, then you must append a new entry in your SCD table and update the previous entries.
Lakeflow Declarative Pipelines removes this complexity and lets you create a separate table containing all the modifications from the beginning of time. This table can then be used at scale, with specific partitions / zorder columns if required. Out of order fields will be handled out of the box based on the _sequence_by
To create an SCD2 table, we must use the option: STORED AS SCD TYPE 2
in SQL or stored_as_scd_type="2"
in Python.
note
You can also limit which columns the feature tracks using the option: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
Click Edit and Insert cell below to insert a new empty cell.
Copy and paste the following code into the new cell in the notebook.
Python
dlt.create_streaming_table(
name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
)
dlt.create_auto_cdc_flow(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
)
SQL
CREATE OR REFRESH STREAMING TABLE customers_history;
CREATE FLOW cusotmers_history_cdc
AS AUTO CDC INTO
customers_history
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 2;
Click Start to start an update for the connected pipeline.
The table customers_history
contains all historical changes a user has made to their information. You will now create a simple materialized view in the gold layer that keeps track of who has changed their information the most. This could be used for fraud detection analysis or user recommendations in a real-world scenario. Additionally, applying changes with SCD2 has already removed duplicates for us, so we can directly count the rows per user ID.
Click Edit and Insert cell below to insert a new empty cell.
Copy and paste the following code into the new cell in the notebook.
Python
@dlt.table(
name = "customers_history_agg",
comment = "Aggregated customer history"
)
def customers_history_agg():
return (
dlt.read("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
)
SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
SELECT
id,
count("address") as address_count,
count("email") AS email_count,
count("firstname") AS firstname_count,
count("lastname") AS lastname_count
FROM customers_history
GROUP BY id
Click Start to start an update for the connected pipeline.
Next, create a workflow to automate data ingestion, processing, and analysis steps using a Databricks job.
CDC customers workflow
.ETL_customers_data
.See Monitoring and observability for Lakeflow Jobs for more information about job runs.
Step 8: Schedule the jobâTo run the ETL pipeline on a schedule, follow these steps:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4