Liquid clustering replaces table partitioning and ZORDER
to simplify data layout decisions and optimize query performance. It provides the flexibility to redefine clustering keys without rewriting existing data, allowing data layout to evolve alongside analytic needs over time. Liquid clustering applies to both Streaming Tables and Materialized Views.
important
Liquid clustering is generally available for Delta Lake tables and in Public Preview for managed Apache Iceberg tables.
For all Delta Lake tables with liquid clustering enabled, Databricks recommends using Databricks Runtime 15.2 and above. Public Preview support with limitations is available in Databricks Runtime 13.3 LTS and above. Row-level concurrency is supported in Databricks Runtime 13.3 LTS and above and is generally available in Databricks Runtime 14.2 and above for all tables with deletion vectors enabled. See Isolation levels and write conflicts on Databricks.
For all Apache Iceberg tables with liquid clustering enabled, Databricks Runtime 16.4 LTS and above is required.
What is liquid clustering used for?âDatabricks recommends liquid clustering for all new tables, which includes both Streaming Tables (STs) and Materialized Views (MVs). The following are examples of scenarios that benefit from clustering:
You can enable liquid clustering on an existing table or during table creation. Clustering is not compatible with partitioning or ZORDER
, and requires that you use Databricks to manage all layout and optimization operations for data in your table. After liquid clustering is enabled, run OPTIMIZE
jobs as usual to incrementally cluster data. See How to trigger clustering.
To enable liquid clustering, add the CLUSTER BY
phrase to a table creation statement, as in the examples below:
note
In Databricks Runtime 14.2 and above, you can use DataFrame APIs and DeltaTable API in Python or Scala to enable liquid clustering for Delta Lake tables.
SQL
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)
LOCATION 'table_location'
AS SELECT * FROM table1;
CREATE TABLE table3 LIKE table1;
For Apache Iceberg, you must explicitly disable deletion vectors and Row IDs when enabling Liquid Clustering on a managed Iceberg table.
Python
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Scala
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
In Databricks Runtime 16.0 and above, you can create tables with liquid clustering enabled using Structured Streaming writes, as in the following examples:
SQL
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1);
Python
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
Scala
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
warning
Delta tables created with liquid clustering enabled have numerous Delta table features enabled at creation and use Delta writer version 7 and reader version 3. You can override the enablement of some of these features. See Override default feature enablement (optional).
Table protocol versions cannot be downgraded, and tables with clustering enabled are not readable by Delta Lake clients that do not support all enabled Delta reader protocol table features. See Delta Lake feature compatibility and protocols.
Enable liquid clustering on an existing unpartitioned Delta table using the following syntax:
SQL
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
For Apache Iceberg, you must explicitly disable deletion vectors and Row IDs when enabling liquid clustering on an existing managed Iceberg table.
important
The default behavior does not apply clustering to previously written data. To force reclustering for all records, you must use OPTIMIZE FULL
. See Force reclustering for all records.
To remove clustering keys, use the following syntax:
SQL
ALTER TABLE table_name CLUSTER BY NONE;
Automatic liquid clusteringâ
In Databricks Runtime 15.4 LTS and above, you can enable automatic liquid clustering for Unity Catalog managed Delta tables. With automatic liquid clustering enabled, Databricks intelligently chooses clustering keys to optimize query performance. You enable automatic liquid clustering using the CLUSTER BY AUTO
clause.
When enabled, automatic key selection and clustering operations run asynchronously as a maintenance operation and require that predictive optimization is enabled for the table. See Predictive optimization for Unity Catalog managed tables.
To identify clustering keys, Databricks analyzes the table's historical query workload and identifies the best candidate columns. Clustering keys are changed when the predicted cost savings from data skipping improvements outweigh the data clustering cost.
If the way you query your data changes over time or query performance indicates changes in your data distributions, automatic liquid clustering selects new keys to optimize performance.
If a key was not selected by automatic liquid clustering, the reason can be:
You can apply automatic liquid clustering for all Unity Catalog managed tables, regardless of data and query characteristics. These features provide intelligent optimization of data layout based on your data usage patterns, and the heuristics will decide whether it's cost-beneficial to select clustering keys.
note
You can read or write tables with automatic clustering enabled from all Databricks Runtime versions that support liquid clustering. However, intelligent key selection relies on metadata introduced in Databricks Runtime 15.4 LTS. Use Databricks Runtime 15.4 LTS or above to ensure that automatically selected keys benefit all of your workloads and that these workloads are considered when selecting new keys.
Enable or disable automatic liquid clusteringâTo enable or disable automatic liquid clustering on a new or existing table, use the following syntax:
SQL
CREATE OR REPLACE TABLE table1(column01 int, column02 string) CLUSTER BY AUTO;
ALTER TABLE table1 CLUSTER BY AUTO;
ALTER TABLE table1 CLUSTER BY NONE;
ALTER TABLE table1 CLUSTER BY (column01, column02);
note
If you run CREATE OR REPLACE table_name
without specifying CLUSTER BY AUTO
and the table already exists and has automatic liquid clustering enabled, the AUTO
setting and the clustering columns for the table (if applied) are preserved when the table is replaced. Predictive optimization also maintains the historical query workload for this table to identify the best clustering keys.
Python
df = spark.read.table("table1")
df.write
.format("delta")
.option(âclusterByAutoâ, âtrueâ)
.saveAsTable(...)
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option(âclusterByAutoâ, âtrueâ)
.saveAsTable(...)
df.writeTo(...).using("delta")
.option(âclusterByAutoâ, âtrueâ)
.create()
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option(âclusterByAutoâ, âtrueâ)
.create()
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
note
The Python API is available in Databricks Runtime 16.4 and above.
When .clusterBy
is used together with .option('clusterByAuto', 'true)
, then:
.clusterBy
..clusterBy
can be accepted once. For example, the columns specified by .clusterBy
are only set if the table has no clustering columns set already, either by you or by automatic liquid clustering.You can only use Python when creating or replacing a table. Use SQL to change the clusterByAuto
status of an existing table.
To check if a table has automatic liquid clustering enabled, use DESCRIBE TABLE
or SHOW TBLPROPERTIES
.
If automatic liquid clustering is enabled, the clusterByAuto
property is set to true
. The clusteringColumns
property shows the current clustering columns that were automatically or manually selected.
Automatic liquid clustering is not available for Apache Iceberg.
Override default feature enablement (optional)âYou can override default behavior that enables Delta table features during liquid clustering enablement. This prevents the reader and writer protocols associated with those table features from being upgraded. You must have an existing table to complete the following steps:
Use ALTER TABLE
to set the table property that disables one or more features. For example, to disable deletion vectors run the following:
SQL
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
Enable liquid clustering on the table by running the following:
SQL
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
The following table provides information on the Delta features you can override and how enablement impacts compatibility with Databricks Runtime versions.
Choose clustering keysâDatabricks recommends automatic liquid clustering for supported tables. See Automatic liquid clustering.
Databricks recommends choosing clustering keys based on the columns most frequently used in query filters. Clustering keys can be defined in any order. If two columns are highly correlated, you only need to include one of them as a clustering key.
You can specify up to four clustering keys. For smaller tables (under 10 TB), using more clustering keys (for example, four) can degrade performance when filtering on a single column compared to using fewer clustering keys (for example, two). However, as table size increases, the performance difference with using more clustering keys for single-column queries becomes negligible.
You can only specify columns that have statistics collected as clustering keys. By default, the first 32 columns in a Delta table have statistics collected. See Specify Delta statistics columns.
Clustering supports the following data types for clustering keys:
If you're converting an existing table, consider the following recommendations:
Write data to a clustered tableâTo write to a clustered Delta table, you must use a Delta writer client that supports all Delta write protocol table features used by liquid clustering. To write to a clustered Iceberg table, you can use Unity Catalog's Iceberg REST Catalog API. On Databricks, you must use Databricks Runtime 13.3 LTS and above.
Operations that cluster on write include the following:
INSERT INTO
operationsCTAS
and RTAS
statementsCOPY INTO
from Parquet formatspark.write.mode("append")
Clustering on write only triggers when data in the transaction meets a size threshold. These thresholds vary by the number of clustering columns and are lower for Unity Catalog managed tables than other Delta tables.
Because not all operations apply liquid clustering, Databricks recommends frequently running OPTIMIZE
to ensure that all data is efficiently clustered.
Structured Streaming workloads support clustering on write when you set the Spark config spark.databricks.delta.liquid.eagerClustering.streaming.enabled
to true
. Clustering for these workloads only triggers if at least one of the last five streaming updates exceeds a size threshold from the table above.
Predictive optimization automatically runs OPTIMIZE commands for enabled tables. See Predictive optimization for Unity Catalog managed tables. When using Predictive optimization, Databricks recommends disabling any scheduled OPTIMIZE jobs.
To trigger clustering, you must use Databricks Runtime 13.3 LTS or above. Use the OPTIMIZE
command on your table:
Liquid clustering is incremental, meaning that data is only rewritten as necessary to accommodate data that needs to be clustered. Data files with clustering keys that do not match the data to be clustered are not rewritten.
If you are not using predictive optimization, Databricks recommends scheduling regular OPTIMIZE
jobs to cluster data. For tables experiencing many updates or inserts, Databricks recommends scheduling an OPTIMIZE
job every one or two hours. Because liquid clustering is incremental, most OPTIMIZE
jobs for clustered tables run quickly.
In Databricks Runtime 16.0 and above, you can force reclustering of all records in a table with the following syntax:
SQL
OPTIMIZE table_name FULL;
important
Running OPTIMIZE FULL
reclusters all existing data as necessary. For large tables that have not previously been clustered on the specified keys, this operation might take hours.
Run OPTIMIZE FULL
when you enable clustering for the first time or change clustering keys. If you have previously run OPTIMIZE FULL
and there has been no change to clustering keys, OPTIMIZE FULL
runs the same as OPTIMIZE
. In this scenario, OPTIMIZE
uses an incremental approach and only rewrites files that haven't previously been compacted. Always use OPTIMIZE FULL
to ensure that data layout reflects the current clustering keys.
You can read data in a clustered Delta table using any Delta Lake client that supports reading deletion vectors. Using the Iceberg REST Catalog API, you can read data in a clustered Iceberg table.
SQL
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
Change clustering keysâ
You can change clustering keys for a table at any time by running an ALTER TABLE
command, as in the following example:
SQL
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
When you change clustering keys, subsequent OPTIMIZE
and write operations use the new clustering approach, but existing data is not rewritten.
You can also turn off clustering by setting the keys to NONE
, as in the following example:
SQL
ALTER TABLE table_name CLUSTER BY NONE;
Setting cluster keys to NONE
does not rewrite data that has already been clustered, but prevents future OPTIMIZE
operations from using clustering keys.
You can enable liquid clustering on managed Iceberg tables from external Iceberg engines. To enable liquid clustering, specify partition columns when creating a table. Unity Catalog interprets the partitions as clustering keys. For example, run the command below in OSS Spark:
SQL
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
You can disable liquid clustering:
SQL
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
You can change clustering keys using Iceberg partition evolution:
SQL
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
If you specify a partition using a bucket transform, Unity Catalog will drop the expression and use the column as a clustering key:
SQL
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
See how table is clusteredâ
You can use DESCRIBE
commands to see the clustering keys for a table, as in the following examples:
SQL
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
Compatibility for tables with liquid clusteringâ
Tables created with liquid clustering in Databricks Runtime 14.1 and above use v2 checkpoints by default. You can read and write tables with v2 checkpoints in Databricks Runtime 13.3 LTS and above.
You can disable v2 checkpoints and downgrade table protocols to read tables with liquid clustering in Databricks Runtime 12.2 LTS and above. See Drop a Delta Lake table feature and downgrade table protocol.
LimitationsâThe following limitations exist:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4