A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://docs.databricks.com/aws/en/dev-tools/python-sql-connector below:

Databricks SQL Connector for Python

Databricks SQL Connector for Python

The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks all-purpose compute and Databricks SQL warehouses. The Databricks SQL Connector for Python is easier to set up and use than similar Python libraries such as pyodbc. This library follows PEP 249 – Python Database API Specification v2.0.

important

Databricks SQL Connector for Python version 3.0.0 and above supports native parameterized query execution, which prevents SQL injection and can improve query performance. Previous versions used inline parameterized execution, which is not safe from SQL injection and has other drawbacks. For more information, see Using Native Parameters.

The Databricks SQL Connector for Python also supports the SQLAlchemy dialect for Databricks, but it must be installed to use these features. See Use SQLAlchemy with Databricks.

Requirements​ Get started​ Authentication​

The Databricks SQL Connector for Python supports the following Databricks authentication types:

Databricks personal access token authentication​

To use the Databricks SQL Connector for Python with Databricks personal access token authentication, you must first create a Databricks personal access token. To do so, follow the steps in Databricks personal access tokens for workspace users.

To authenticate the Databricks SQL Connector for Python, use the following code snippet. This snippet assumes that you have set the following environment variables:

To set environment variables, see your operating system's documentation.

Python

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

OAuth machine-to-machine (M2M) authentication​

Databricks SQL Connector for Python versions 2.5.0 and above support OAuth machine-to-machine (M2M) authentication. You must also install the Databricks SDK for Python (for example by running pip install databricks-sdk or python -m pip install databricks-sdk).

To use the Databricks SQL Connector for Python with OAuth M2M authentication, you must do the following:

  1. Create a Databricks service principal in your Databricks workspace, and create an OAuth secret for that service principal.

    To create the service principal and its OAuth secret, see Authorize service principal access to Databricks with OAuth. Make a note of the service principal's UUID or Application ID value, and the Secret value for the service principal's OAuth secret.

  2. Give that service principal access to your all-purpose compute or warehouse.

    To give the service principal access to your all-purpose compute or warehouse, see Compute permissions or Manage a SQL warehouse.

To authenticate the Databricks SQL Connector for Python, use the following code snippet. This snippet assumes that you have set the following environment variables:

To set environment variables, see your operating system's documentation.

Python

from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os

server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")

def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)

with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:

OAuth user-to-machine (U2M) authentication​

Databricks SQL Connector for Python versions 2.1.0 and above support OAuth user-to-machine (U2M) authentication.

To authenticate the Databricks SQL Connector for Python with OAuth U2M authentication, use the following code snippet. OAuth U2M authentication uses real-time human sign-in and consent to authenticate the target Databricks user account. This snippet assumes that you have set the following environment variables:

To set environment variables, see your operating system's documentation.

Python

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:

Examples​

The following code examples demonstrate how to use the Databricks SQL Connector for Python to query and insert data, query metadata, manage cursors and connections, and configure logging.

note

The following code examples demonstrate how to use a Databricks personal access token for authentication. To use other available Databricks authentication types instead, see Authentication.

These code example retrieve their server_hostname, http_path, and access_token connection variable values from these environment variables:

You can use other approaches to retrieving these connection variable values. Using environment variables is just one approach among many.

Set User-Agent​

The following code example demonstrates how to set the User-Agent application product_name for usage tracking.

Python

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
user_agent_entry = "product_name") as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1 + 1")
result = cursor.fetchall()

for row in result:
print(row)
Query data​

The following code example demonstrates how to call the Databricks SQL Connector for Python to run a basic SQL command on all-purpose compute or SQL warehouse. This command returns the first two rows from the trips table in the samples catalog's nyctaxi schema.

Python

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()

for row in result:
print(row)
Insert data​

The following example demonstrate how to insert small amounts of data (thousands of rows):

Python

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")

squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])

cursor.execute(f"INSERT INTO squares VALUES {values}")

cursor.execute("SELECT * FROM squares LIMIT 10")

result = cursor.fetchall()

for row in result:
print(row)

For large amounts of data, you should first upload the data to cloud storage and then execute the COPY INTO command.

Query metadata​

There are dedicated methods for retrieving metadata. The following example retrieves metadata about columns in a sample table:

Python

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
Manage cursors and connections​

It is a best practice to close any connections and cursors that are no longer in use. This frees resources on Databricks all-purpose compute and Databricks SQL warehouses.

You can use a context manager (the with syntax used in previous examples) to manage the resources, or explicitly call close:

Python

from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()
Manage files in Unity Catalog volumes​

The Databricks SQL Connector enables you to write local files to Unity Catalog volumes, download files from volumes, and delete files from volumes, as shown in the following example:

Python

from databricks import sql
import os







with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:

with connection.cursor() as cursor:



cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)


cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)


cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
Configure logging​

The Databricks SQL Connector uses Python's standard logging module. You can configure the logging level similar to the following:

Python

from databricks import sql
import os, logging

logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")

result = cursor.fetchall()

for row in result:
logging.debug(row)

cursor.close()
connection.close()
Testing​

To test your code, use Python test frameworks such as pytest. To test your code under simulated conditions without calling Databricks REST API endpoints or changing the state of your Databricks accounts or workspaces, you can use Python mocking libraries such as unittest.mock.

For example, given the following file named helpers.py containing a get_connection_personal_access_token function that uses a Databricks personal access token to return a connection to a Databricks workspace, and a select_nyctaxi_trips function that uses the connection to get the specified number of data rows from the trips table in the samples catalog's nyctaxi schema:

Python



from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor

def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)

def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result

And given the following file named main.py that calls the get_connection_personal_access_token and select_nyctaxi_trips functions:

Python



from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips

connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)

rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)

for row in rows:
print(row)

The following file named test_helpers.py tests whether the select_nyctaxi_trips function returns the expected response. Rather than creating a real connection to the target workspace, this test mocks a Connection object. The test also mocks some data that conforms to the schema and values that are in the real data. The test returns the mocked data through the mocked connection and then checks whether one of the mocked data rows' values matches the expected value.

Python



import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec

@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]

def test_select_nyctaxi_trips(mock_data: List[Row]):

mock_connection = create_autospec(Connection)


mock_connection.cursor().fetchall.return_value = mock_data


response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)


assert response[1].fare_amount == 3.5

Because the select_nyctaxi_trips function contains a SELECT statement and therefore does not change the state of the trips table, mocking is not absolutely required in this example. However, mocking enables you to quickly run your tests without waiting for an actual connection to be made with the workspace. Also, mocking enables you to run simulated tests multiple times for functions that might change a table's state, such as INSERT INTO, UPDATE, and DELETE FROM.

API reference​ Package​

databricks-sql-connector

Usage: pip install databricks-sql-connector

See also databricks-sql-connector in the Python Package Index (PyPI).

Module​

databricks.sql

Usage: from databricks import sql

Classes​

Selected classes include the following:

Connection class​

To create a Connection object, call the databricks.sql.connect method with the following parameters:

Selected Connection methods include the following:

Cursor class​

To create a Cursor object, call the Connection class's cursor method.

Selected Cursor attributes include the following:

Selected Cursor methods include the following:

Row class​

The row class is a tuple-like data structure that represents an individual result row. If the row contains a column with the name "my_column", you can access the "my_column" field of row via row.my_column. You can also use numeric indicies to access fields, for example row[0]. If the column name is not allowed as an attribute method name (for example, it begins with a digit), then you can access the field as row["1_my_column"].

Since version 1.0

Selected Row methods include:

| asDict

Returns a dictionary representation of the row, which is indexed by field names. If there are duplicate field names, one of the duplicate fields (but only one) will be returned in the dictionary. Which duplicate field is returned is not defined.

No parameters.

Returns a dict of fields. |

Type conversions​

The following table maps Apache Spark SQL data types to their Python data type equivalents.

Troubleshooting​ tokenAuthWrapperInvalidAccessToken: Invalid access token message​

Issue: When you run your code, you see a message similar to Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token.

Possible cause: The value passed to access_token is not a valid Databricks personal access token.

Recommended fix: Check that the value passed to access_token is correct and try again.

gaierror(8, 'nodename nor servname provided, or not known') message​

Issue: When you run your code, you see a message similar to Error during request to server: gaierror(8, 'nodename nor servname provided, or not known').

Possible cause: The value passed to server_hostname is not the correct host name.

Recommended fix: Check that the value passed to server_hostname is correct and try again.

For more information on finding the server hostname, see Get connection details for a Databricks compute resource.

IpAclError message​

Issue: When you run your code, you see the message Error during request to server: IpAclValidation when you try to use the connector on a Databricks notebook.

Possible cause: You may have IP allow listing enabled for the Databricks workspace. With IP allow listing, connections from Spark clusters back to the control plane are not allowed by default.

Recommended fix: Ask your administrator to add the compute plane subnet to the IP allow list.

Additional resources​

For more information, see:


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4