Learn how to create and deploy an ETL (extract, transform, and load) pipeline for data orchestration using Lakeflow Declarative Pipelines and Auto Loader. An ETL pipeline implements the steps to read data from source systems, transform that data based on requirements, such as data quality checks and record de-duplication, and write the data to a target system, such as a data warehouse or a data lake.
In this tutorial, you will use Lakeflow Declarative Pipelines and Auto Loader to:
For more information about Lakeflow Declarative Pipelines and Auto Loader, see Lakeflow Declarative Pipelines and What is Auto Loader?
RequirementsTo complete this tutorial, you must meet the following requirements:
ALL PRIVILEGES
or USE CATALOG
and CREATE SCHEMA
.ALL PRIVILEGES
or USE SCHEMA
and CREATE VOLUME
.The dataset used in this example is a subset of the Million Song Dataset, a collection of features and metadata for contemporary music tracks. This dataset is available in the sample datasets included in your Azure Databricks workspace.
Step 1: Create a pipelineFirst, you will create an ETL pipeline in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines creates pipelines by resolving dependencies defined in notebooks or files (called source code) using Lakeflow Declarative Pipelines syntax. Each source code file can contain only one language, but you can add multiple language-specific notebooks or files in the pipeline. To learn more, see Lakeflow Declarative Pipelines
Important
Leave the Source code field blank to create and configure a notebook for source code authoring automatically.
This tutorial uses serverless compute and Unity Catalog. For all configuration options that are not specified, use the default settings. If serverless compute is not enabled or supported in your workspace, you can complete the tutorial as written using default compute settings. If you use default compute settings, you must manually select Unity Catalog under Storage options in the Destination section of the Create pipeline UI.
To create a new ETL pipeline in Lakeflow Declarative Pipelines, follow these steps:
The pipeline UI appears for the new pipeline.
Step 2: Develop a pipelineImportant
Notebooks can only contain a single programming language. Do not mix Python and SQL code in pipeline source code notebooks.
In this step, you will use Databricks Notebooks to develop and validate source code for Lakeflow Declarative Pipelines interactively.
The code uses Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage. To learn more, see What is Auto Loader?
A blank source code notebook is automatically created and configured for the pipeline. The notebook is created in a new directory in your user directory. The name of the new directory and file match the name of your pipeline. For example, /Users/someone@example.com/my_pipeline/my_pipeline
.
When developing a pipeline, you can choose either Python or SQL. Examples are included for both languages. Based on your language choice, check that you select the default notebook language. To learn more about notebook support for Lakeflow Declarative Pipelines code development, see Develop and debug ETL pipelines with a notebook in Lakeflow Declarative Pipelines.
A link to access this notebook is under the Source code field in the Pipeline details panel. Click the link to open the notebook before proceeding to the next step.
Click Connect in the upper-right to open the compute configuration menu.
Hover over the name of the pipeline you created in Step 1.
Click Connect.
Next to your notebook's title at the top, select the notebook's default language (Python or SQL).
Copy and paste the following code into a cell in the notebook.
Python# Import modules
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
# Define the path to the source data
file_path = f"/databricks-datasets/songs/data-001/"
# Define a streaming table to ingest data from a volume
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
@dlt.table(
comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.load(file_path))
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="Million Song Dataset with data cleaned and prepared for analysis."
)
@dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
@dlt.expect("valid_title", "song_title IS NOT NULL")
@dlt.expect("valid_duration", "duration > 0")
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of songs released by the artists who released the most songs each year."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE songs_raw
COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
AS SELECT *
FROM STREAM read_files(
'/databricks-datasets/songs/data-001/part*',
format => "csv",
header => "false",
delimiter => "\t",
schema => """
artist_id STRING,
artist_lat DOUBLE,
artist_long DOUBLE,
artist_location STRING,
artist_name STRING,
duration DOUBLE,
end_of_fade_in DOUBLE,
key INT,
key_confidence DOUBLE,
loudness DOUBLE,
release STRING,
song_hotnes DOUBLE,
song_id STRING,
start_of_fade_out DOUBLE,
tempo DOUBLE,
time_signature INT,
time_signature_confidence DOUBLE,
title STRING,
year INT,
partial_sequence STRING
""",
schemaEvolutionMode => "none");
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
CONSTRAINT valid_duration EXPECT (duration > 0)
)
COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
FROM songs_raw;
-- Define a materialized view that has a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
AS SELECT
artist_name,
year,
COUNT(*) AS total_number_of_songs
FROM songs_prepared
WHERE year > 0
GROUP BY artist_name, year
ORDER BY total_number_of_songs DESC, year DESC
Click Start to start an update for the connected pipeline.
In this step, you query the data processed in the ETL pipeline to analyze the song data in the Databricks SQL Editor. These queries use the prepared records created in the previous step.
First, run a query that finds the artists who have released the most songs each year since 1990.
In the sidebar, click SQL Editor.
Click the new tab icon and select Create new query from the menu.
Enter the following:
-- Which artists released the most songs each year in 1990 or later?
SELECT artist_name, total_number_of_songs, year
FROM <catalog>.<schema>.top_artists_by_year
WHERE year >= 1990
ORDER BY total_number_of_songs DESC, year DESC
Replace <catalog>
and <schema>
with the name of the catalog and schema the table is in. For example, data_pipelines.songs_data.top_artists_by_year
.
Click Run selected.
Now, run another query that finds songs with a 4/4 beat and danceable tempo.
Click the new tap icon and select Create new query from the menu.
Enter the following code:
-- Find songs with a 4/4 beat and danceable tempo
SELECT artist_name, song_title, tempo
FROM <catalog>.<schema>.songs_prepared
WHERE time_signature = 4 AND tempo between 100 and 140;
Replace <catalog>
and <schema>
with the name of the catalog and schema the table is in. For example, data_pipelines.songs_data.songs_prepared
.
Click Run selected.
Next, create a workflow to automate data ingestion, processing, and analysis steps using a Databricks job.
Songs workflow
.ETL_songs_data
.See Monitoring and observability for Lakeflow Jobs for more information about job runs.
Step 5: Schedule the pipeline jobTo run the ETL pipeline on a schedule, follow these steps:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4