A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://docs.databricks.com/aws/en/optimizations/isolation-level below:

Isolation levels and write conflicts on Databricks

Isolation levels and write conflicts on Databricks

The isolation level of a table defines the degree to which a transaction must be isolated from modifications made by concurrent operations. Write conflicts on Databricks depend on the isolation level.

Delta Lake provides ACID transaction guarantees between reads and writes. This means that:

See What are ACID guarantees on Databricks?.

note

Databricks use Delta Lake for all tables by default. This article describes behavior for Delta Lake on Databricks.

important

Metadata changes cause all concurrent write operations to fail. These operations include changes to table protocol, table properties, or data schema.

Streaming reads fail when they encounter a commit that changes table metadata. If you want the stream to continue you must restart it. For recommended methods, see Production considerations for Structured Streaming.

The following are examples of queries that change metadata:

SQL


ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')


ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);


ALTER TABLE table_name DROP FEATURE deletionVectors;


REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));


ALTER TABLE table_name ADD COLUMNS (col_name STRING);
Write conflicts with row-level concurrency​

Row-level concurrency reduces conflicts between concurrent write operations by detecting changes at the row-level and automatically resolving conflicts that occur when concurrent writes update or delete different rows in the same data file.

Row-level concurrency is generally available on Databricks Runtime 14.2 and above. Row-level concurrency is supported by default for the following conditions:

Tables with partitions do not support row-level concurrency but can still avoid conflicts between OPTIMIZE and all other write operations when deletion vectors are enabled. See Limitations for row-level concurrency.

For other Databricks Runtime versions, see Row-level concurrency preview behavior (legacy).

MERGE INTO support for row-level concurrency requires Photon in Databricks Runtime 14.2. In Databricks Runtime 14.3 LTS and above, Photon is not required.

The following table describes which pairs of write operations can conflict in each isolation level with row-level concurrency enabled.

important

(1) All INSERT operations in the tables above describe append operations that do not read any data from the same table before committing. INSERT operations that contain subqueries reading the same table support the same concurrency as MERGE.

REORG operations have isolation semantics identical to OPTIMIZE when rewriting data files to reflect changes recorded in deletion vectors. When you use REORG to apply an upgrade, table protocols change, which conflicts with all ongoing operations.

Write conflicts without row-level concurrency​

The following table describes which pairs of write operations can conflict in each isolation level.

Tables do not support row-level concurrency if they have partitions defined or do not have deletion vectors enabled. Databricks Runtime 14.2 or above is required for row-level concurrency.

important

(1) All INSERT operations in the tables above describe append operations that do not read any data from the same table before committing. INSERT operations that contain subqueries reading the same table support the same concurrency as MERGE.

REORG operations have isolation semantics identical to OPTIMIZE when rewriting data files to reflect changes recorded in deletion vectors. When you use REORG to apply an upgrade, table protocols change, which conflicts with all ongoing operations.

Limitations for row-level concurrency​

Some limitations apply for row-level concurrency. For the following operations, conflict resolution follows normal concurrency for write conflicts on Databricks. See Write conflicts without row-level concurrency.

note

Row-level conflict detection can increase the total execution time. In the case of many concurrent transactions, the writer prioritizes latency over conflict resolution and conflicts may occur.

All limitations for deletion vectors also apply. See Limitations.

When does Delta Lake commit without reading the table?​

Delta Lake INSERT or append operations do not read the table state before committing if the following conditions are satisfied:

  1. Logic is expressed using INSERT SQL logic or append mode.
  2. Logic contains no subqueries or conditionals that reference the table targeted by the write operation.

As in other commits, Delta Lake validates and resolves the table versions on commit using metadata in the transaction log, but no version of the table is actually read.

note

Many common patterns use MERGE operations to insert data based on table conditions. Although it might be possible to rewrite this logic using INSERT statements, if any conditional expression references a column in the target table, these statements have the same concurrency limitations as MERGE.

Write serializable vs. serializable isolation levels​

The isolation level of a table defines the degree to which a transaction must be isolated from modifications made by concurrent transactions. Delta Lake on Databricks supports two isolation levels: Serializable and WriteSerializable.

Read operations always use snapshot isolation. The write isolation level determines whether or not it is possible for a reader to see a snapshot of a table, that according to the history, “never existed”.

For the Serializable level, a reader always sees only tables that conform to the history. For the WriteSerializable level, a reader could see a table that does not exist in the Delta log.

For example, consider txn1, a long running delete and txn2, which inserts data deleted by txn1. txn2 and txn1 complete and they are recorded in that order in the history. According to the history, the data inserted in txn2 should not exist in the table. For Serializable level, a reader would never see data inserted by txn2. However, for the WriteSerializable level, a reader could at some point see the data inserted by txn2.

For more information on which types of operations can conflict with each other in each isolation level and the possible errors, see Avoid conflicts using partitioning and disjoint command conditions.

Set the isolation level​

You set the isolation level using the ALTER TABLE command.

SQL

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

where <level-name> is Serializable or WriteSerializable.

For example, to change the isolation level from the default WriteSerializable to Serializable, run:

SQL

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
Avoid conflicts using partitioning and disjoint command conditions​

In all cases marked “can conflict”, whether the two operations will conflict depends on whether they operate on the same set of files. You can make the two sets of files disjoint by partitioning the table by the same columns as those used in the conditions of the operations. For example, the two commands UPDATE table WHERE date > '2010-01-01' ... and DELETE table WHERE date < '2010-01-01' will conflict if the table is not partitioned by date, as both can attempt to modify the same set of files. Partitioning the table by date will avoid the conflict. Therefore, partitioning a table according to the conditions commonly used on the command can reduce conflicts significantly. However, partitioning a table by a column that has high cardinality can lead to other performance issues due to the large number of subdirectories.

Conflict exceptions​

When a transaction conflict occurs, you will observe one of the following exceptions:

ConcurrentAppendException​

This exception occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. The file additions can be caused by INSERT, DELETE, UPDATE, or MERGE operations.

With the default isolation level of WriteSerializable, files added by blind INSERT operations (that is, operations that blindly append data without reading any data) do not conflict with any operation, even if they touch the same partition (or anywhere in an unpartitioned table). If the isolation level is set to Serializable, then blind appends may conflict.

Important: Blind appends can conflict in WriteSerializable mode if multiple concurrent transactions running DELETE, UPDATE, or MERGE operations might reference values inserted by blind appends. To avoid this conflict, do one of the following:

This exception is often thrown during concurrent DELETE, UPDATE, or MERGE operations. While the concurrent operations may be physically updating different partition directories, one of them may read the same partition that the other one concurrently updates, thus causing a conflict. You can avoid this by making the separation explicit in the operation condition. Consider the following example.

Scala


deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

Suppose you run the above code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table, you don't expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions. Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.

Scala


deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

This operation is now safe to run concurrently on different dates and countries.

ConcurrentDeleteReadException​

This exception occurs when a concurrent operation deleted a file that your operation read. Common causes are a DELETE, UPDATE, or MERGE operation that rewrites files.

ConcurrentDeleteDeleteException​

This exception occurs when a concurrent operation deleted a file that your operation also deletes. This could be caused by two concurrent compaction operations rewriting the same files.

MetadataChangedException​

This exception occurs when a concurrent transaction updates the metadata of a Delta table. Common causes are ALTER TABLE operations or writes to your Delta table that update the schema of the table.

ConcurrentTransactionException​

If a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. You should never have two streaming queries use the same checkpoint location and run at the same time.

ProtocolChangedException​

This exception can occur in the following cases:

See Delta Lake feature compatibility and protocols for more details.

Row-level concurrency preview behavior (legacy)​

This section describes preview behaviors for row-level concurrency in Databricks Runtime 14.1 and below. Row-level concurrency always requires deletion vectors.

In Databricks Runtime 13.3 LTS and above, tables with liquid clustering enabled automatically enable row-level concurrency.

In Databricks Runtime 14.0 and 14.1, you can enable row-level concurrency for tables with deletion vectors by setting the following configuration for the cluster or SparkSession:

ini

spark.databricks.delta.rowLevelConcurrencyPreview = true

In Databricks Runtime 14.1 and below, non-Photon compute only supports row-level concurrency for DELETE operations.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4