A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-functions-udf-aggregate below:

User-defined aggregate functions (UDAFs) | Databricks Documentation

User-defined aggregate functions (UDAFs)

Applies to: Databricks Runtime

User-defined aggregate functions (UDAFs) are user-programmable routines that act on multiple rows at once and return a single aggregated value as a result. This documentation lists the classes that are required for creating and registering UDAFs. It also contains examples that demonstrate how to define and register UDAFs in Scala and invoke them in Spark SQL.

Aggregator​

Syntax Aggregator[-IN, BUF, OUT]

A base class for user-defined aggregations, which can be used in Dataset operations to take all of the elements of a group and reduce them to a single value.

Examples​ Type-safe user-defined aggregate functions​

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

Scala

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {

def zero: Average = Average(0L, 0L)


def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}

def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}

def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count

val bufferEncoder: Encoder[Average] = Encoders.product

val outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

Java

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
private String name;
private long salary;


}

public static class Average implements Serializable {
private long sum;
private long count;


}

public static class MyAverage extends Aggregator<Employee, Average, Double> {

public Average zero() {
return new Average(0L, 0L);
}


public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}

public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}

public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}

public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}

public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().format("json").load(path).as(employeeEncoder);
ds.show();









MyAverage myAverage = new MyAverage();

TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();





Untyped user-defined aggregate functions​

Typed aggregations, as described above, may also be registered as untyped aggregating UDFs for use with DataFrames. For example, a user-defined average for untyped DataFrames can look like:

Scala

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions

case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Long, Average, Double] {

def zero: Average = Average(0L, 0L)


def reduce(buffer: Average, data: Long): Average = {
buffer.sum += data
buffer.count += 1
buffer
}

def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}

def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count

val bufferEncoder: Encoder[Average] = Encoders.product

val outputEncoder: Encoder[Double] = Encoders.scalaDouble
}


spark.udf.register("myAverage", functions.udaf(MyAverage))

val df = spark.read.format("json").load("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()









val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()





Java

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;

public static class Average implements Serializable {
private long sum;
private long count;



}

public static class MyAverage extends Aggregator<Long, Average, Double> {

public Average zero() {
return new Average(0L, 0L);
}


public Average reduce(Average buffer, Long data) {
long newSum = buffer.getSum() + data;
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}

public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}

public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}

public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}

public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}


spark.udf().register("myAverage", functions.udaf(new MyAverage(), Encoders.LONG()));

Dataset<Row> df = spark.read().format("json").load("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();









Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();





SQL


CREATE FUNCTION myAverage AS 'MyAverage' USING JAR '/tmp/MyAverage.jar';

SHOW USER FUNCTIONS;
+
| function|
+
| default.myAverage|
+

CREATE TEMPORARY VIEW employees
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/employees.json"
);

SELECT * FROM employees;
+
| name|salary|
+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+

SELECT myAverage(salary) as average_salary FROM employees;
+
|average_salary|
+
| 3750.0|
+

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4