The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks all-purpose compute and Databricks SQL warehouses. The Databricks SQL Connector for Python is easier to set up and use than similar Python libraries such as pyodbc. This library follows PEP 249 â Python Database API Specification v2.0.
important
Databricks SQL Connector for Python version 3.0.0 and above supports native parameterized query execution, which prevents SQL injection and can improve query performance. Previous versions used inline parameterized execution, which is not safe from SQL injection and has other drawbacks. For more information, see Using Native Parameters.
The Databricks SQL Connector for Python also supports the SQLAlchemy dialect for Databricks, but it must be installed to use these features. See Use SQLAlchemy with Databricks.
RequirementsâInstall the Databricks SQL Connector for Python. PyArrow is an optional dependency of Databricks SQL Connector for Python and is not installed by default in version 4.0.0 and above of the connector. If PyArrow is not installed, features such as CloudFetch and other Apache Arrow functionality is not available, which may impact the performance for large volumes of data.
pip install databricks-sql-connector
.pip install databricks-sql-connector[pyarrow]
.Gather the following information for the all-purpose compute or SQL warehouse that you want to use:
The server hostname of the all-purpose compute. You can get this from the Server Hostname value in the Advanced Options > JDBC/ODBC tab for your all-purpose compute.
The HTTP path of the all-purpose compute. You can get this from the HTTP Path value in the Advanced Options > JDBC/ODBC tab for your all-purpose compute.
note
The SQL connector does not support connecting to jobs compute.
The Databricks SQL Connector for Python supports the following Databricks authentication types:
To use the Databricks SQL Connector for Python with Databricks personal access token authentication, you must first create a Databricks personal access token. To do so, follow the steps in Databricks personal access tokens for workspace users.
To authenticate the Databricks SQL Connector for Python, use the following code snippet. This snippet assumes that you have set the following environment variables:
DATABRICKS_SERVER_HOSTNAME
set to the Server Hostname value for your all-purpose compute or SQL warehouse.DATABRICKS_HTTP_PATH
, set to HTTP Path value for your all-purpose compute or SQL warehouse.DATABRICKS_TOKEN
, set to the Databricks personal access token.To set environment variables, see your operating system's documentation.
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
OAuth machine-to-machine (M2M) authenticationâ
Databricks SQL Connector for Python versions 2.5.0 and above support OAuth machine-to-machine (M2M) authentication. You must also install the Databricks SDK for Python (for example by running pip install databricks-sdk
or python -m pip install databricks-sdk
).
To use the Databricks SQL Connector for Python with OAuth M2M authentication, you must do the following:
Create a Databricks service principal in your Databricks workspace, and create an OAuth secret for that service principal.
To create the service principal and its OAuth secret, see Authorize service principal access to Databricks with OAuth. Make a note of the service principal's UUID or Application ID value, and the Secret value for the service principal's OAuth secret.
Give that service principal access to your all-purpose compute or warehouse.
To give the service principal access to your all-purpose compute or warehouse, see Compute permissions or Manage a SQL warehouse.
To authenticate the Databricks SQL Connector for Python, use the following code snippet. This snippet assumes that you have set the following environment variables:
DATABRICKS_SERVER_HOSTNAME
set to the Server Hostname value for your all-purpose compute or SQL warehouse.DATABRICKS_HTTP_PATH
, set to HTTP Path value for your all-purpose compute or SQL warehouse.DATABRICKS_CLIENT_ID
, set to the service principal's UUID or Application ID value.DATABRICKS_CLIENT_SECRET
, set to the Secret value for the service principal's OAuth secret.To set environment variables, see your operating system's documentation.
Python
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)
with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
OAuth user-to-machine (U2M) authenticationâ
Databricks SQL Connector for Python versions 2.1.0 and above support OAuth user-to-machine (U2M) authentication.
To authenticate the Databricks SQL Connector for Python with OAuth U2M authentication, use the following code snippet. OAuth U2M authentication uses real-time human sign-in and consent to authenticate the target Databricks user account. This snippet assumes that you have set the following environment variables:
DATABRICKS_SERVER_HOSTNAME
to the Server Hostname value for your all-purpose compute or SQL warehouse.DATABRICKS_HTTP_PATH
to HTTP Path value for your all-purpose compute or SQL warehouse.To set environment variables, see your operating system's documentation.
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
Examplesâ
The following code examples demonstrate how to use the Databricks SQL Connector for Python to query and insert data, query metadata, manage cursors and connections, and configure logging.
note
The following code examples demonstrate how to use a Databricks personal access token for authentication. To use other available Databricks authentication types instead, see Authentication.
These code example retrieve their server_hostname
, http_path
, and access_token
connection variable values from these environment variables:
DATABRICKS_SERVER_HOSTNAME
, which represents the Server Hostname value from the requirements.DATABRICKS_HTTP_PATH
, which represents the HTTP Path value from the requirements.DATABRICKS_TOKEN
, which represents your access token from the requirements.You can use other approaches to retrieving these connection variable values. Using environment variables is just one approach among many.
The following code example demonstrates how to set the User-Agent application product_name
for usage tracking.
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
user_agent_entry = "product_name") as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1 + 1")
result = cursor.fetchall()
for row in result:
print(row)
Query dataâ
The following code example demonstrates how to call the Databricks SQL Connector for Python to run a basic SQL command on all-purpose compute or SQL warehouse. This command returns the first two rows from the trips
table in the samples
catalog's nyctaxi
schema.
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
Insert dataâ
The following example demonstrate how to insert small amounts of data (thousands of rows):
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
cursor.execute("SELECT * FROM squares LIMIT 10")
result = cursor.fetchall()
for row in result:
print(row)
For large amounts of data, you should first upload the data to cloud storage and then execute the COPY INTO command.
Query metadataâThere are dedicated methods for retrieving metadata. The following example retrieves metadata about columns in a sample table:
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
Manage cursors and connectionsâ
It is a best practice to close any connections and cursors that are no longer in use. This frees resources on Databricks all-purpose compute and Databricks SQL warehouses.
You can use a context manager (the with
syntax used in previous examples) to manage the resources, or explicitly call close
:
Python
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
Manage files in Unity Catalog volumesâ
The Databricks SQL Connector enables you to write local files to Unity Catalog volumes, download files from volumes, and delete files from volumes, as shown in the following example:
Python
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:
with connection.cursor() as cursor:
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
Configure loggingâ
The Databricks SQL Connector uses Python's standard logging module. You can configure the logging level similar to the following:
Python
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
logging.debug(row)
cursor.close()
connection.close()
Testingâ
To test your code, use Python test frameworks such as pytest. To test your code under simulated conditions without calling Databricks REST API endpoints or changing the state of your Databricks accounts or workspaces, you can use Python mocking libraries such as unittest.mock.
For example, given the following file named helpers.py
containing a get_connection_personal_access_token
function that uses a Databricks personal access token to return a connection to a Databricks workspace, and a select_nyctaxi_trips
function that uses the connection to get the specified number of data rows from the trips
table in the samples
catalog's nyctaxi
schema:
Python
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)
def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result
And given the following file named main.py
that calls the get_connection_personal_access_token
and select_nyctaxi_trips
functions:
Python
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)
for row in rows:
print(row)
The following file named test_helpers.py
tests whether the select_nyctaxi_trips
function returns the expected response. Rather than creating a real connection to the target workspace, this test mocks a Connection
object. The test also mocks some data that conforms to the schema and values that are in the real data. The test returns the mocked data through the mocked connection and then checks whether one of the mocked data rows' values matches the expected value.
Python
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]
def test_select_nyctaxi_trips(mock_data: List[Row]):
mock_connection = create_autospec(Connection)
mock_connection.cursor().fetchall.return_value = mock_data
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)
assert response[1].fare_amount == 3.5
Because the select_nyctaxi_trips
function contains a SELECT
statement and therefore does not change the state of the trips
table, mocking is not absolutely required in this example. However, mocking enables you to quickly run your tests without waiting for an actual connection to be made with the workspace. Also, mocking enables you to run simulated tests multiple times for functions that might change a table's state, such as INSERT INTO
, UPDATE
, and DELETE FROM
.
databricks-sql-connector
Usage: pip install databricks-sql-connector
See also databricks-sql-connector in the Python Package Index (PyPI).
Moduleâdatabricks.sql
Usage: from databricks import sql
Selected classes include the following:
Connection
classâ
To create a Connection
object, call the databricks.sql.connect
method with the following parameters:
Selected Connection
methods include the following:
Cursor
classâ
To create a Cursor
object, call the Connection
class's cursor
method.
Selected Cursor
attributes include the following:
Selected Cursor
methods include the following:
Row
classâ
The row class is a tuple-like data structure that represents an individual result row. If the row contains a column with the name "my_column"
, you can access the "my_column"
field of row
via row.my_column
. You can also use numeric indicies to access fields, for example row[0]
. If the column name is not allowed as an attribute method name (for example, it begins with a digit), then you can access the field as row["1_my_column"]
.
Since version 1.0
Selected Row
methods include:
| asDict
Returns a dictionary representation of the row, which is indexed by field names. If there are duplicate field names, one of the duplicate fields (but only one) will be returned in the dictionary. Which duplicate field is returned is not defined.
No parameters.
Returns a dict
of fields. |
The following table maps Apache Spark SQL data types to their Python data type equivalents.
TroubleshootingâtokenAuthWrapperInvalidAccessToken: Invalid access token
messageâ
Issue: When you run your code, you see a message similar to Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token
.
Possible cause: The value passed to access_token
is not a valid Databricks personal access token.
Recommended fix: Check that the value passed to access_token
is correct and try again.
gaierror(8, 'nodename nor servname provided, or not known')
messageâ
Issue: When you run your code, you see a message similar to Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')
.
Possible cause: The value passed to server_hostname
is not the correct host name.
Recommended fix: Check that the value passed to server_hostname
is correct and try again.
For more information on finding the server hostname, see Get connection details for a Databricks compute resource.
IpAclError
messageâ
Issue: When you run your code, you see the message Error during request to server: IpAclValidation
when you try to use the connector on a Databricks notebook.
Possible cause: You may have IP allow listing enabled for the Databricks workspace. With IP allow listing, connections from Spark clusters back to the control plane are not allowed by default.
Recommended fix: Ask your administrator to add the compute plane subnet to the IP allow list.
Additional resourcesâFor more information, see:
bool
, bytearray
, float
, int
, and str
) on the Python websitedatetime.date
and datatime.datetime
) on the Python websitedecimal.Decimal
) on the Python websiteNoneType
) on the Python websiteRetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4