Since Spark 3.4.0 release, Spark SQL provides built-in support for reading and writing protobuf data.
DeployingThe spark-protobuf
module is external and not included in spark-submit
or spark-shell
by default.
As with any Spark applications, spark-submit
is used to launch your application. spark-protobuf_2.13
and its dependencies can be directly added to spark-submit
using --packages
, such as,
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...
For experimenting on spark-shell
, you can also use --packages
to add org.apache.spark:spark-protobuf_2.13
and its dependencies directly,
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...
See Application Submission Guide for more details about submitting applications with external dependencies.
to_protobuf() and from_protobuf()The spark-protobuf package provides function to_protobuf
to encode a column as binary in protobuf format, and from_protobuf()
to decode protobuf binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.
Using protobuf message as columns is useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.
from_protobuf()
to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a different sink.to_protobuf()
can be used to turn structs into protobuf message. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to from_protobuf
and to_protobuf
. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results.
This div is only used to make markdown editor/viewer happy and does not display on web ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()
```
This div is only used to make markdown editor/viewer happy and does not display on web ```scala
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
```
This div is only used to make markdown editor/viewer happy and does not display on web ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
```
Supported types for Protobuf -> Spark SQL conversionCurrently Spark supports reading protobuf scalar types, enum types, nested type, and maps type under messages of Protobuf. In addition to the these types, spark-protobuf
also introduces support for Protobuf OneOf
fields. which allows you to handle messages that can have multiple possible sets of fields, but only one set can be present at a time. This is useful for situations where the data you are working with is not always in the same format, and you need to be able to handle messages with different sets of fields without encountering errors.
It also supports reading the following Protobuf types Timestamp and Duration
Protobuf logical type Protobuf schema Spark SQL type duration MessageType{seconds: Long, nanos: Int} DayTimeIntervalType timestamp MessageType{seconds: Long, nanos: Int} TimestampType Supported types for Spark SQL -> Protobuf conversionSpark supports the writing of all Spark SQL types into Protobuf. For most types, the mapping from Spark types to Protobuf types is straightforward (e.g. IntegerType gets converted to int);
Spark SQL type Protobuf type BooleanType boolean IntegerType int LongType long FloatType float DoubleType double StringType string StringType enum BinaryType bytes StructType message ArrayType repeated MapType map Handling circular references protobuf fieldsOne common issue that can arise when working with Protobuf data is the presence of circular references. In Protobuf, a circular reference occurs when a field refers back to itself or to another field that refers back to the original field. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior. To address this issue, the latest version of spark-protobuf introduces a new feature: the ability to check for circular references through field types. This allows users use the recursive.fields.max.depth
option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, spark-protobuf
will not permit recursive fields by setting recursive.fields.max.depth
to -1. However, you can set this option to 1 to 10 if needed.
Setting recursive.fields.max.depth
to 1 drops all recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. A recursive.fields.max.depth
value greater than 10 is not allowed, as it can lead to performance issues and even stack overflows.
SQL Schema for the below protobuf message will vary based on the value of recursive.fields.max.depth
.
This div is only used to make markdown editor/viewer happy and does not display on web ```protobuf
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
1: struct<name: string>
2: struct<name: string, bff: struct<name: string>>
3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
...
```
Data Source OptionData source options of Protobuf can be set via:
from_protobuf
to_protobuf
mode
FAILFAST
Allows a mode for dealing with corrupt records during parsing.
PERMISSIVE
: when it meets a corrupted record, sets all fields to null
.DROPMALFORMED
: ignores the whole corrupted records. This mode is unsupported in the Protobuf built-in functions.FAILFAST
: throws an exception when it meets corrupted records.recursive.fields.max.depth
-1
Specifies the maximum number of recursion levels to allow when parsing the schema. For more details refers to the section Handling circular references protobuf fields. read convert.any.fields.to.json
false
Enables converting Protobuf Any
fields to JSON. This option should be enabled carefully. JSON conversion and processing are inefficient. In addition, schema safety is also reduced making downstream processing error-prone. read emit.default.values
false
Whether to render fields with zero values when deserializing Protobuf to a Spark struct. When a field is empty in the serialized Protobuf, this library will deserialize them as null
by default, this option can control whether to render the type-specific zero values. read enums.as.ints
false
Whether to render enum fields as their integer values. When this option set to false
, an enum field will be mapped to StringType
, and the value is the name of enum; when set to true
, an enum field will be mapped to IntegerType
, the value is its integer value. read upcast.unsigned.ints
false
Whether to upcast unsigned integers into a larger type. Setting this option to true
, LongType
is used for uint32
and Decimal(20, 0)
is used for uint64
, so their representation can contain large unsigned values without overflow. read unwrap.primitive.wrapper.types
false
Whether to unwrap the struct representation for well-known primitive wrapper types when deserializing. By default, the wrapper types for primitives (i.e. google.protobuf.Int32Value, google.protobuf.Int64Value, etc.) will get deserialized as structs. read retain.empty.message.types
false
Whether to retain fields of the empty proto message type in Schema. Since Spark doesn't allow writing empty StructType
, the empty proto message type will be dropped by default. Setting this option to true
will insert a dummy column(__dummy_field_in_empty_struct
) to the empty proto message so that the empty message fields will be retained. read
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4