This project provides an Apache Spark connector for Dgraph databases in Scala and Python. It comes with a Spark Data Source to read graphs from a Dgraph cluster directly into DataFrames, GraphX or GraphFrames. The connector supports filter pushdown, projection pushdown and partitioning by orthogonal dimensions predicates and nodes.
Example Scala code:
import org.apache.spark.sql.DataFrame val target = "localhost:9080" import org.apache.spark.graphx._ import uk.co.gresearch.spark.dgraph.graphx._ val graph: Graph[VertexProperty, EdgeProperty] = spark.read.dgraph.graphx(target) val edges: RDD[Edge[EdgeProperty]] = spark.read.dgraph.edges(target) val vertices: RDD[(VertexId, VertexProperty)] = spark.read.dgraph.vertices(target) import org.graphframes.GraphFrame import uk.co.gresearch.spark.dgraph.graphframes._ val graph: GraphFrame = spark.read.dgraph.graphframes(target) val edges: DataFrame = spark.read.dgraph.edges(target) val vertices: DataFrame = spark.read.dgraph.vertices(target) import org.apache.spark.sql.DataFrame import uk.co.gresearch.spark.dgraph.connector._ val triples: DataFrame = spark.read.dgraph.triples(target) val edges: DataFrame = spark.read.dgraph.edges(target) val nodes: DataFrame = spark.read.dgraph.nodes(target)
Example Python code (pyspark ≥3.0, see PySpark Shell and Python script):
from pyspark.sql import DataFrame from gresearch.spark.dgraph.connector import * triples: DataFrame = spark.read.dgraph.triples("localhost:9080") edges: DataFrame = spark.read.dgraph.edges("localhost:9080") nodes: DataFrame = spark.read.dgraph.nodes("localhost:9080")
The connector provides the following features:
The connector has the following known limitations:
@lang
directives.The Spark Dgraph Connector is available for Spark 3.0, 3.1, 3.2, 3.3, 3.4 and 3.5, with Scala 2.12 and 2.13. Use Maven artifact ID spark-dgraph-connector_2.12
or spark-dgraph-connector_2.12
. The Spark version is part of the package version, i.e. 0.12.0-3.0, 0.12.0-3.1, 0.12.0-3.2, 0.12.0-3.3, 0.12.0-3.4 and 0.12.0-3.5, respectively.
Add this line to your build.sbt
file to use the latest version for Spark 3.5:
libraryDependencies += "uk.co.gresearch.spark" %% "spark-dgraph-connector" % "0.12.0-3.5"
Add this dependency to your pom.xml
file to use the latest version:
<dependency> <groupId>uk.co.gresearch.spark</groupId> <artifactId>spark-dgraph-connector_2.13</artifactId> <version>0.12.0-3.5</version> </dependency>
Launch the Scala Spark REPL (Spark ≥3.0.0) with the Spark Dgraph Connector dependency (version ≥0.5.0) as follows:
spark-shell --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.12.0-3.5PySpark Shell and Python script
Launch the Python Spark REPL (pyspark ≥3.0.0) with the Spark Dgraph Connector dependency (version ≥0.5.0) as follows:
pyspark --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.12.0-3.5
Run your Python scripts that use PySpark (pyspark ≥3.0.0) and the Spark Dgraph Connector (version ≥0.5.0) via spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.12.0-3.5 [script.py]
The following examples use a local Dgraph (≥20.03.3) instance setup as described in the Dgraph Quickstart Guide. Run Step 1 to start an instance, a DROP_ALL
for Dgraph ≥20.07.0 only, Step 2 to load example graph data, and Step 3 to add a schema. These steps are provided in the following scripts:
./dgraph-instance.start.sh ./dgraph-instance.drop-all.sh # for Dgraph ≥20.07.0 only ./dgraph-instance.schema.sh ./dgraph-instance.insert.sh
The Dgraph version can optionally be set via DGRAPH_TEST_CLUSTER_VERSION
environment variable.
The connection to Dgraph can be established via a target
, which is the hostname and gRPC port of a Dgraph Alpha node in the form <hostname>:<port>
. With our example instance started above, we can use localhost:9080
as the target.
The Dgraph UI Ratel can be used to query your local Dgraph instance. Open a browser and got to https://play.dgraph.io/?latest. Connect to your local Dgraph via http://localhost:8080.
You can load the entire Dgraph database into an Apache Spark GraphX graph. For example:
import uk.co.gresearch.spark.dgraph.graphx._ val graph = spark.read.dgraph.graphx("localhost:9080")
Example code to perform a PageRank computation on this graph to test that the connector is working:
val pageRank = graph.pageRank(0.0001) pageRank.vertices.foreach(println)
You can load the entire Dgraph database into a GraphFrames graph. For example:
import uk.co.gresearch.spark.dgraph.graphframes._ val graph: GraphFrame = spark.read.dgraph.graphframes("localhost:9080")
Example code to perform a PageRank computation on this graph to test that the connector is working:
val pageRank = graph.pageRank.maxIter(10) pageRank.run().triplets.show(false)
Note: Predicates get renamed when they are loaded from the Dgraph database. Any .
(dot) in the name is replaced by a _
(underscore). To guarantee uniqueness of names, underscores in the original predicate names are replaced by two underscores. For instance, predicates dgraph.type
and release_date
become dgraph_type
and release__date
, respectively.
Dgraph data can be loaded into Spark DataFrames in various forms:
You can load the entire Dgraph database as triples into an Apache Spark DataFrame. For example:
import uk.co.gresearch.spark.dgraph.connector._ val triples = spark.read.dgraph.triples("localhost:9080")
The returned DataFrame
has the following schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectUid: long (nullable = true)
|-- objectString: string (nullable = true)
|-- objectLong: long (nullable = true)
|-- objectDouble: double (nullable = true)
|-- objectTimestamp: timestamp (nullable = true)
|-- objectBoolean: boolean (nullable = true)
|-- objectGeo: string (nullable = true)
|-- objectPassword: string (nullable = true)
|-- objectType: string (nullable = true)
The object value gets stored in exactly one of the object*
(except objectType
) columns, depending on the type of the value. The objectType
column provides the type of the object. Here is an example:
This model allows you to store the fully-typed triples in a DataFrame
.
The triples can also be loaded in an un-typed, narrow form:
import uk.co.gresearch.spark.dgraph.connector._ spark .read .option(TriplesModeOption, TriplesModeStringOption) .dgraph.triples("localhost:9080") .show
The resulting DataFrame
has the following schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectString: string (nullable = true)
|-- objectType: string (nullable = true)
The object value gets stored as a string in objectString
, and objectType
provides you with the actual type of the object. Here is an example:
You can load all nodes into a DataFrame
in a fully-typed form. This contains all the nodes' properties but no edges to other nodes:
import uk.co.gresearch.spark.dgraph.connector._ spark.read.dgraph.nodes("localhost:9080")
The returned DataFrame
has the following schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectString: string (nullable = true)
|-- objectLong: long (nullable = true)
|-- objectDouble: double (nullable = true)
|-- objectTimestamp: timestamp (nullable = true)
|-- objectBoolean: boolean (nullable = true)
|-- objectGeo: string (nullable = true)
|-- objectPassword: string (nullable = true)
|-- objectType: string (nullable = true)
The schema of the returned DataFrame
is very similar to the typed triples schema, except that there is no objectUid
column linking to other nodes. Here is an example:
Nodes can also be loaded in a wide, fully-typed format:
import uk.co.gresearch.spark.dgraph.connector._ spark .read .option(NodesModeOption, NodesModeWideOption) .dgraph.nodes("localhost:9080")
The returned DataFrame
has the following schema format, which is dependent on the schema of the underlying Dgraph database. Node properties are stored in typed columns and are ordered alphabetically (property columns start after the subject
column):
root
|-- subject: long (nullable = false)
|-- dgraph.graphql.schema: string (nullable = true)
|-- dgraph.type: string (nullable = true)
|-- name: string (nullable = true)
|-- release_date: timestamp (nullable = true)
|-- revenue: double (nullable = true)
|-- running_time: long (nullable = true)
Note: The graph schema could become very large and therefore the DataFrame
could become prohibitively wide.
Note: The Wide Nodes source enforces the predicate partitioner to produce a single partition.
Edges can be loaded as follows:
import uk.co.gresearch.spark.dgraph.connector._ spark.read.dgraph.edges("localhost:9080")
The returned DataFrame
has the following simple schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectUid: long (nullable = false)
Though there is only a single object
column for the destination node, it is called objectUid
to align with the DataFrame
schemata above.
Predicates marked in the Dgraph schema with the @lang
directive can store string values in multiple languages at a time:
{
set {
_:sw3 <title> "Star Wars: Episode VI - Return of the Jedi" .
_:sw3 <title@en> "Star Wars: Episode VI - Return of the Jedi" .
_:sw3 <title@zh> "星際大戰六部曲:絕地大反攻" .
_:sw3 <title@th> "สตาร์ วอร์ส เอพพิโซด 6: การกลับมาของเจได" .
_:sw3 <title@de> "Die Rückkehr der Jedi-Ritter" .
}
}
The connector reads all these languages. Each of the predicate name contains the language in the form predicate@language
:
Dgraph isolates reads from writes through transactions. Since the connector initiates multiple reads while fetching the entire graph (partitioning), writes called mutations should be isolated in order to get a consistent snapshot of the graph.
Setting the dgraph.transaction.mode
option to "read"
will cause the connector to read all partitions within the same transaction. However, this will cause an exception on the Dgraph cluster when too many mutations occur while reading partitions. With that option set to "none"
, no such exception will occur but reads are not isolated from writes.
The connector supports filter pushdown to improve efficiency when reading only sub-graphs. This is supported only in conjunction with the predicate partitioner. Spark filters can only be pushed for some column and data source because columns may have different meaning. Columns can be of the following types:
The following table lists all supported Spark filters:
Spark Filter Supported Columns ExampleEqualTo
.where($"subject" === 1L)
.where($"predicate" === "dgraph.type")
.where($"dgraph.type" === "Person")
.where($"objectType" === "string")
.where($"objectLong" === 123)
In
.where($"subject".isin(1L,2L))
.where($"predicate".isin("release_date", "revenue"))
.where($"dgraph.type".isin("Person","Film"))
.where($"objectType".isin("string","long"))
.where($"objectLong".isin(123,456))
IsNotNull
.where($"dgraph.type".isNotNull)
.where($"objectLong".isNotNull)
The connector supports projection pushdown to improve efficiency when reading only sub-graphs. A projection in Spark terms is a select
operation that selects only a subset of a DataFrame's columns. The Wide Nodes source supports projection pushdown on all predicate value columns.
The following query uses filter and projection pushdown. First we define a wide node DataFrame
:
val df = spark.read .options(Map( NodesModeOption -> NodesModeWideOption, PartitionerOption -> PredicatePartitionerOption )) .dgraph.nodes("localhost:9080")
Then we select some columns (projection) and rows (filter):
df .select($"subject", $"`dgraph.type`", $"revenue") // projection .where($"revenue".isNotNull) // filter .show()
This selects the columns subject
, dgraph.type
and revenue
for only those rows that actually have a value for revenue
. The underlying query to Dgraph simplifies from (the full graph):
{
pred1 as var(func: has(<dgraph.graphql.schema>))
pred2 as var(func: has(<dgraph.graphql.xid>))
pred3 as var(func: has(<dgraph.type>))
pred4 as var(func: has(<name>))
pred5 as var(func: has(<release_date>))
pred6 as var(func: has(<revenue>))
pred7 as var(func: has(<running_time>))
result (func: uid(pred1,pred2,pred3,pred4,pred5,pred6,pred7)) {
uid
<dgraph.graphql.schema>
<dgraph.graphql.xid>
<dgraph.type>
<name>
<release_date>
<revenue>
<running_time>
}
}
to (selected predicates and nodes only):
{
pred1 as var(func: has(<revenue>))
result (func: uid(pred1)) {
uid
<dgraph.type>
<revenue>
}
}
The response is faster as only relevant data are transferred between Dgraph and Spark.
subject dgraph.type revenue 4 Film 7.75E8 5 Film 5.34E8 6 Film 5.72E8 9 Film 1.39E8The connector (Spark ≥3.0 only) collects metrics per partition that provide insights in throughout and timing of the communication to the Dgraph cluster. For each request to Dgraph (a chunk), the number of received bytes, uids and retrieval time are recorded and summed per partition. The values can be seen on the Spark UI for the respective stages that performs the read:
The connector uses Spark Accumulators to collect these metrics. They can be accessed by the Spark driver via a SparkListener
:
val handler = new SparkListener { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = stageCompleted.stageInfo.accumulables.values.foreach(println) } spark.sparkContext.addSparkListener(handler) spark.read.dgraph.triples("localhost:9080").count()
The following metrics are available:
Metric DescriptionDgraph Bytes
Size of JSON responses from the Dgraph cluster in Byte. Dgraph Chunks
Number of requests sent to the Dgraph cluster. Dgraph Time
Time waited for Dgraph to respond in Seconds. Dgraph Uids
Number of Uids read.
Partitioning your Dgraph graph is essential to be able to load large quantities of graph data into Spark. Spark splits data into partitions, where ideally all partitions have the same size and are of decent size. Partitions that are too large will kill your Spark executor as they won't fit into memory. When partitions are too small your Spark job becomes inefficient and slow, but will not fail.
Each partition connects to the Dgraph cluster and reads a specific sub-graph. Partitions are non-overlapping.
This connector provides various ways to partition your graph. When the default partitioning does not work for your specific use case, try a more appropriate partitioning scheme.
The following Partitioner
implementations are available:
P
predicates per partition where P
defaults to 1000
. Picks multiple predicates from the same Dgraph group. Large graphs where each predicate fits into a partition, otherwise combine with Uid Range partitioner. Skewness of predicates reflects skewness of partitions. Uid Range uids Each partition has at most N
uids where N
defaults to 1000000
. Large graphs where single uid
s fit into a partition. Can be combined with any predicate partitioner, otherwise induces internal Dgraph cluster communication across groups. Predicate + Uid Range (default) predicates + uids Partitions by predicate first (see Predicate Partitioner), then each partition gets partitioned by uid (see Uid Partitioner) Graphs of any size. Partitioning by Predicates
The Dgraph data can be partitioned by predicates. Each partition then contains a distinct set of predicates. The number of predicates per partition can be configured via dgraph.partitioner.predicate.predicatesPerPartition
, which defaults to 1000
.
Predicate partitions connect only to alpha nodes that contain those predicates. Hence, these reads are all locally to the alpha nodes and induce no Dgraph cluster internal communication.
A uid
represents a node or vertex in Dgraph terminology. An "Uid Range" partitioning splits the graph by the subject of the graph triples. This can be combined with predicate partitioning, which serves as an orthogonal partitioning. Without predicate partitioning, uid
partitioning induces internal Dgraph cluster communication across the groups.
The uid partitioning always works on top of a predicate partitioner. If none is defined a singleton partitioner is used. The number of uids of each underlying partition has to be estimated. Once the number of uids is estimated, the partition is further split into ranges of that uid space.
The space of existing uids
is split into ranges of N
uids
per partition. The N
defaults to 1000000
and can be configured via dgraph.partitioner.uidRange.uidsPerPartition
. The uid
s are allocated to partitions in ascending order. Such a split will not be done if more than dgraph.partitioner.uidRange.maxPartitions
partitions would be created. This defaults to 10000
. If vertex size is skewed and a function of uid
, then partitions will be skewed as well.
Note: With uid partitioning, the chunk size configured via dgraph.chunkSize
should be at least a 10th of the number of uids per partition configured via dgraph.partitioner.uidRange.uidsPerPartition
to avoid inefficiency due to chunks overlapping with partition borders. When your result is sparse w.r.t. the uid space set the chunk size to 100th or less.
The connector reads each partition from Dgraph in a streamed fashion. It splits up a partition into smaller chunks, where each chunk contains 100000
uids. This chunk size can be configured via dgraph.chunkSize
. Each chunk sends a single query to Dgraph. The chunk size limits the size of the result. Due to the low memory footprint of the connector, Spark could read your entire graph via a single partition (you would have to repartition
the read DataFrame to make Spark shuffle the data properly). However, this would be slow, but it proves the connector can handle any size of graph with fixed executor memory requirement.
The connector uses Spark's Log4j standard logging framework. Add the following line to your log4j.properties
to set the log level of the connector specifically:
log4j.logger.uk.co.gresearch.spark.dgraph.connector=DEBUG
See SPARK_HOME/conf/log4j.properties.template
for a template file.
Some unit tests require a Dgraph (≥20.03.3) cluster running at localhost:9080
. It has to be set up as described in the Examples section. If that cluster is not running, the unit tests will launch and set up such a cluster for you. This requires docker
to be installed on your machine and will make the tests take longer. If you run those tests frequently it is recommended you run the cluster setup yourself.
You can set the Dgraph version that is started automatically by setting environment variable DGRAPH_TEST_CLUSTER_VERSION
. The default version is defined in uk.co.gresearch.spark.dgraph.DgraphTestCluster.DgraphDefaultVersion
.
The Python code can be tested with pytest
:
PYTHONPATH="python:python/test" python -m pytest python/test
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4