A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://docs.databricks.com/aws/en/udf/pandas below:

pandas user-defined functions | Databricks Documentation

pandas user-defined functions

A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.

For background information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

You define a pandas UDF using the keyword pandas_udf as a decorator and wrap the function with a Python type hint. This article describes the different types of pandas UDFs and shows how to use pandas UDFs with type hints.

Series to Series UDF​

You use a Series to Series pandas UDF to vectorize scalar operations. You can use them with APIs such as select and withColumn.

The Python function must accept a pandas Series as an input and return a pandas Series of the same length. Specify these types using Python type hints. Spark runs a pandas UDF by splitting the data into batches of rows, calling the function for each batch, and then concatenating the results.

The following example shows how to create a pandas UDF that computes the product of 2 columns.

Python

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType


def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())


x = pd.Series([1, 2, 3])
print(multiply_func(x, x))






df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))


df.select(multiply(col("x"), col("x"))).show()







Iterator of Series to Iterator of Series UDF​

An iterator UDF is the same as a scalar pandas UDF except:

You should specify the Python type hint as Iterator[pandas.Series] -> Iterator[pandas.Series].

This pandas UDF is useful when the UDF execution requires initializing some state, for example, loading a machine learning model file to apply inference to every input batch.

The following example shows how to create a pandas UDF with iterator support.

Python

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)



@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1

df.select(plus_one(col("x"))).show()











y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value
try:
for x in batch_iter:
yield x + y
finally:
pass

df.select(plus_y(col("x"))).show()







Iterator of multiple Series to Iterator of Series UDF​

An Iterator of multiple Series to Iterator of Series UDF has similar characteristics and restrictions as Iterator of Series to Iterator of Series UDF. The specified function takes an iterator of batches and outputs an iterator of batches. It is also useful when the UDF execution requires initializing some state.

The differences are:

You specify the type hints as Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series].

Python

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b

df.select(multiply_two_cols("x", "x")).show()







Series to scalar UDF​

Series to scalar pandas UDFs are similar to Spark aggregate functions. A Series to scalar pandas UDF defines an aggregation from one or more pandas Series to a scalar value, where each pandas Series represents a Spark column. You use a Series to scalar pandas UDF with APIs such as select, withColumn, groupBy.agg, and pyspark.sql.Window.

You express the type hint as pandas.Series, ... -> Any. The return type should be a primitive data type, and the returned scalar can be either a Python primitive type, for example, int or float or a NumPy data type such as numpy.int64 or numpy.float64. Any should ideally be a specific scalar type.

This type of UDF does not support partial aggregation and all data for each group is loaded into memory.

The following example shows how to use this type of UDF to compute mean with select, groupBy, and window operations:

Python

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))


@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df.select(mean_udf(df['v'])).show()






df.groupby("id").agg(mean_udf(df['v'])).show()







w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()









For detailed usage, see pyspark.sql.functions.pandas_udf.

Usage​ Setting Arrow batch size​

note

This configuration has no impact on compute configured with standard access mode and Databricks Runtime 13.3 LTS through 14.2.

Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, you can adjust the size of the Arrow record batches by setting the spark.sql.execution.arrow.maxRecordsPerBatch configuration to an integer that determines the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition is divided into 1 or more record batches for processing.

Timestamp with time zone semantics​

Spark internally stores timestamps as UTC values, and timestamp data brought in without a specified time zone is converted as local time to UTC with microsecond resolution.

When timestamp data is exported or displayed in Spark, the session time zone is used to localize the timestamp values. The session time zone is set with the spark.sql.session.timeZone configuration and defaults to the JVM system local time zone. pandas uses a datetime64 type with nanosecond resolution, datetime64[ns], with optional time zone on a per-column basis.

When timestamp data is transferred from Spark to pandas it is converted to nanoseconds and each column is converted to the Spark session time zone then localized to that time zone, which removes the time zone and displays values as local time. This occurs when calling toPandas() or pandas_udf with timestamp columns.

When timestamp data is transferred from pandas to Spark, it is converted to UTC microseconds. This occurs when calling createDataFrame with a pandas DataFrame or when returning a timestamp from a pandas UDF. These conversions are done automatically to ensure Spark has data in the expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond values are truncated.

A standard UDF loads timestamp data as Python datetime objects, which is different than a pandas timestamp. To get the best performance, we recommend that you use pandas time series functionality when working with timestamps in a pandas UDF. For details, see Time Series / Date functionality.

Example notebook​

The following notebook illustrates the performance improvements you can achieve with pandas UDFs:

pandas UDFs benchmark notebook

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4