This post covers core concepts of Apache Spark such as RDD, DAG, execution workflow, forming stages of tasks, and shuffle implementation and also describes the architecture and main components of Spark Driver. There’s a github.com/datastrophic/spark-workshop project created alongside this post which contains Spark Applications examples and dockerized Hadoop environment to play with. Slides are also available at slideshare.
IntroSpark is a generalized framework for distributed data processing providing functional API for manipulating data at scale, in-memory data caching, and reuse across computations. It applies a set of coarse-grained transformations over partitioned data and relies on the dataset lineage to recompute tasks in case of failures. Worth mentioning is that Spark supports the majority of data formats, has integrations with various storage systems, and can be executed on Mesos or YARN.
Powerful and concise API in conjunction with rich library makes it easier to perform data operations at scale. E.g. performing backup and restore of Cassandra column families in Parquet format:
def backup(path: String, config: Config) {
sc.cassandraTable(config.keyspace, config.table)
.map(_.toEvent).toDF()
.write.parquet(path)
}
def restore(path: String, config: Config) {
sqlContext.read.parquet(path)
.map(_.toEvent)
.saveToCassandra(config.keyspace, config.table)
}
Or run discrepancies analysis comparing the data in different data stores:
sqlContext.sql {
"""
SELECT count()
FROM cassandra_event_rollups
JOIN mongo_event_rollups
ON cassandra_event_rollups.uuid = cassandra_event_rollups.uuid
WHERE cassandra_event_rollups.value != cassandra_event_rollups.value
""".stripMargin
}
Recap
Spark is built around the concepts of Resilient Distributed Datasets and Direct Acyclic Graph representing transformations and dependencies between them.
Spark Application (often referred to as Driver Program or Application Master) at a high level consists of SparkContext and user code which interacts with it creating RDDs and performing series of transformations to achieve the final result. These transformations of RDDs are then translated into DAG and submitted to Scheduler to be executed on a set of worker nodes.
RDD: Resilient Distributed DatasetsRDD could be thought of as an immutable parallel data structure with failure recovery possibilities. It provides API for various transformations and materializations of data as well as for control over caching and partitioning of elements to optimize data placement. RDD can be created either from external storage or from another RDD and stores information about its parents to optimize execution (via pipelining of operations) and recompute partition in case of failure.
From a developer’s point of view, RDD represents distributed immutable data (partitioned data + iterator) and lazily evaluated operations (transformations). As an interface RDD defines five main properties:
//a list of partitions (e.g. splits in Hadoop)
def getPartitions: Array[Partition]
//a list of dependencies on other RDDs
def getDependencies: Seq[Dependency[_]]
//a function for computing each split
def compute(split: Partition, context: TaskContext): Iterator[T]
//(optional) a list of preferred locations to compute each split on
def getPreferredLocations(split: Partition): Seq[String] = Nil
//(optional) a partitioner for key-value RDDs
val partitioner: Option[Partitioner] = None
Here’s an example of RDDs created during a call of method sparkContext.textFile("hdfs://...")
which first loads HDFS blocks in memory and then applies map() function to filter out keys creating two RDDs:
RDD Operations Operations on RDDs are divided into several groups:
Here’s a code sample of a job which aggregates data from Cassandra in lambda style combining previously rolled-up data with the data from raw storage and demonstrates some of the transformations and actions available on RDDs
//aggregate events after specific date for given campaign
val events =
sc.cassandraTable("demo", "event")
.map(_.toEvent)
.filter { e =>
e.campaignId == campaignId && e.time.isAfter(watermark)
}
.keyBy(_.eventType)
.reduceByKey(_ + _)
.cache()
//aggregate campaigns by type
val campaigns =
sc.cassandraTable("demo", "campaign")
.map(_.toCampaign)
.filter { c =>
c.id == campaignId && c.time.isBefore(watermark)
}
.keyBy(_.eventType)
.reduceByKey(_ + _)
.cache()
//joined rollups and raw events
val joinedTotals = campaigns.join(events)
.map { case (key, (campaign, event)) =>
CampaignTotals(campaign, event)
}
.collect()
//count totals separately
val eventTotals =
events.map{ case (t, e) => s"$t -> ${e.value}" }
.collect()
val campaignTotals =
campaigns.map{ case (t, e) => s"$t -> ${e.value}" }
.collect()
Execution workflow recap
Here’s a quick recap on the execution workflow before digging deeper into details: user code containing RDD transformations forms Direct Acyclic Graph which is then split into stages of tasks by DAGScheduler. Stages combine tasks that don’t require shuffling/repartitioning of the data. Tasks run on workers and results then returned to the client.
DAGHere’s a DAG for the code sample above. So basically any data processing workflow could be defined as reading the data source, applying a set of transformations, and materializing the result in different ways. Transformations create dependencies between RDDs and here we can see different types of them.
The dependencies are usually classified as “narrow” and “wide”:
Spark stages are created by breaking the RDD graph at shuffle boundaries
RDD.compute()
functions of various RDDsThere are two types of tasks in Spark: ShuffleMapTask
which partitions its input for shuffle and ResultTask
which sends its output to the driver. The same applies to types of stages: ShuffleMapStage
and ResultStage
correspondingly.
During the shuffle, ShuffleMapTask
writes blocks to the local drive, and then the task in the next stages fetches these blocks over the network.
In Spark, Sort Shuffle is the default one since 1.2, but Hash Shuffle is available too.
Sort Shuffle
At 10K foot view there are three major components:
Spark Driver contains more components responsible for translation of user code into actual jobs executed on a cluster:
Executors run as Java processes, so the available memory is equal to the heap size. Internally available memory is split into several regions with specific functions.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4