note
This article covers Databricks Connect for Databricks Runtime 14.1 and above.
This article describes how to execute user-defined functions with Databricks Connect for Scala. Databricks Connect enables you to connect popular IDEs, notebook servers, and custom applications to Databricks clusters. For the Python version of this article, see User-defined functions in Databricks Connect for Python.
For Databricks Runtime 14.1 and above, Databricks Connect for Scala supports running user-defined functions (UDFs).
In order to run a UDF, the compiled class and JARs that the UDF requires must be uploaded to the cluster. The addCompiledArtifacts()
API can be used to specify the compiled class and JAR files that must be uploaded.
The following Scala program sets up a simple UDF that squares values in a column.
Scala
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
In the preceding example, because the UDF is fully contained within Main
, only the compiled artifact of Main
is added. If the UDF spreads over other classes or uses external libraries (i.e., JARs), all of these libraries should also be included.
When the Spark session is already initialized, further compiled classes and JARs can be uploaded using the spark.addArtifact()
API.
note
When uploading JARs, all transitive dependency JARs must be included for upload. The APIs do not perform any automatic detection of transitive dependencies.
Typed Dataset APIsâThe same mechanism described in the preceding section for UDFs also applies to typed Dataset APIs.
Typed Dataset APIs allow one to run transformations such as map, filter, and aggregations on resulting Datasets. These are also executed similar to UDFs on the Databricks cluster.
The following Scala application uses the map()
API to modify a number in a result column to a prefixed string.
Scala
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
While this example uses the map()
API, this also applies to other typed Dataset APIs such as filter()
, mapPartitions()
, etc.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4