3.4 RDD的计算
3.4.1 Ta s k简介
原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。计算节点执行计算逻辑的部分称为Executor。Executor在准备好Task的运行时环境后,会通过调用org.apache.spark.scheduler.Task#run来执行计算。Spark的Task分为两种:
1)org.apache.spark.scheduler.ShuffleMapTask
2)org.apache.spark.scheduler.ResultTask
简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行,执行过程的实现在org.apache. spark.executor.Executor.TaskRunner#run。第6章会介绍这一部分的实现原理和设计思想。
3.4.2 Task的执行起点
org.apache.spark.scheduler.Task#run会 调 用ShuffleMapTask或 者ResultTask的runTask;runTask会调用RDD的org.apache.spark.rdd.RDD#iterator。计算由此开始。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if(storageLevel != StorageLevel.NONE) {
//如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算
SparkEnv.get.cacheManager.getOrCompute(this, split, context,storageLevel)
} else {
//如果有checkpoint,那么直接读取结果;否则直接进行计算
computeOrReadCheckpoint(split, context)
}
}
其中,SparkEnv中包含了一个运行时节点所需要的所有的环境信息。cache-Manager是org.apache.spark.CacheManager,它负责调用BlockManager来管理RDD的缓存,如果当前RDD原来计算过并且把结果缓存起来,那么接下来的运算都可以通过BlockManager来直接读取缓存后返回。SparkEnv除了cacheManager,还包括以下重要的成员变量:
1)akka.actor.ActorSystem:运行在该节点的Actor System,其中运行在Driver上的名字是sparkDriver;运行在Executor上的是sparkExecutor。
2)org.apache.spark.serializer.Serializer:序列化和发序列化的工具。
3)org.apache.spark.MapOutputTracker;保存Shuffle Map Task输出的位置信息。其中在Driver上的Tracer是org.apache.spark.MapOutputTrackerMaster;而在Executor上的Tracker是org.apache.spark.MapOutputTrackerWorker,它会从org.apache.spark. MapOutputTrackerMaster获取信息。
4)org.apache.spark.shuffle.ShuffleManager:Shuffle的管理者,其中Driver端会注册Shuffle的信息,而Executor端会上报和获取Shuffle的信息。现阶段内置支持Hash Based Shuffle和Sort Based Shuffle,具体实现细节请参阅第7章。
5)org.apache.spark.broadcast.BroadcastManager:广播变量的管理者。
6)org.apache.spark.network.BlockTransferService:Executor读取Shuffle数据的Client。当前支持netty和nio,可以通过spark.shuffle.blockTransferService来设置。具体详情可以参阅第7章。
7)org.apache.spark.storage.BlockManager:提供了Storage模块与其他模块的交互接口,管理Storage模块。
8)org.apache.spark.SecurityManager:Spark对于认证授权的实现。
9)org.apache.spark.HttpFileServer:可以提供HTTP服务的Server。当前主要用于Executor端下载依赖。
10)org.apache.spark.metrics.MetricsSystem:用于搜集统计信息。
11)org.apache.spark.shuffle.ShuffleMemoryManager:管理Shuffle过程中使用的内存。ExternalAppendOnlyMap 和ExternalSorter都会从ShuffleMemoryManager中申请内存,在数据spill到Disk后会释放内存。当然了,当Task退出时这个内存也会被回收。为了使得每个thread都会比较公平地获取内存资源,避免一个thread申请了大量内存后造成其他的thread需要频繁地进行spill操作,它采取的内存分配策略是:对于N个thread,每个thread可以至少申请1/(2*N)的内存,但是至多申请1/N。这个N是动态变化的,感兴趣的读者可以查阅这个类的具体实现。
在用户创建org.apache.spark.SparkContext时会创建org.apache.spark.SparkEnv。
3.4.3 缓存的处理
如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。什么是存储级别?从用户的角度来看就是缓存保存到不同的存储位置,比如内存、硬盘、Tachyon;还有缓存的数据是否需要序列化等。详细的存储级别的介绍可以参阅第8章。
cacheManager对Storage模块进行了封装,使得RDD可以更加简单地从Storage模块读取或者写入数据。RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。在系统实现的层面上,可以认为Partition和Block是一一对应的。cacheManager会通过getOrCompute来判断当前的RDD是否需要进行计算。
首先,cacheManager会通过RDD的ID和当前计算的Partition的ID向Storage模块的BlockManager发起查询请求,如果能够获得Block的信息,会直接返回Block的信息。否则,代表该RDD是需要计算的。这个RDD以前可能计算过并且被存储到了内存中,但是后来由于内存紧张,这部分内存被清理了。在计算结束后,计算结果会根据用户定义的存储级别,写入BlockManager中。这样,下次就可以不经过计算而直接读取该RDD的计算结果了。核心实现逻辑如下:
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
//获取RDD的BlockId
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match { //向BlockManager查询是否有缓存
case Some(blockResult) => //缓存命中
//更新统计信息,将缓存作为结果返回
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None => //没有缓存命中,需要计算
// 判断当前是否有线程在处理当前的Partition,如果有那么等待它结束后,直接从Block
// Manager中读取处理结果如果没有线程在计算,那么storedValues就是None,否则
// 就是计算的结果
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) { // 已经被其他线程处理了,直接返回结果
return new InterruptibleIterator[T](context, storedValues.get)
}
// 需要计算
try {
// 如果被checkpoint过,那么读取checkpoint的数据;否则调用rdd的compute()开始
// 计算
val computedValues = rdd.computeOrReadCheckpoint(partition,context)
// Task是在Driver端执行的话就不需要缓存结果,这个主要是为了first() 或者take()
// 这种仅仅有一个执行阶段的任务的快速执行。这类任务由于没有Shuffle阶段,直接运行
// 在Driver端可能会更省时间
if (context.isRunningLocally) {
return computedValues
}
// 将计算结果写入到BlockManager
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues =
putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
// 更新任务的统计信息
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(
Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
// 如果有其他的线程在等待该Partition的处理结果,那么通知它们计算已经完成,结果已
// 经存到BlockManager中(注意前面那类不会写入BlockManager的本地任务)
// loading.notifyAll()
}
}
}
}
3.4.4 checkpoint的处理
在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。
首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache. spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。简要的核心逻辑如下:
// 创建一个保存checkpoint数据的目录
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// 创建广播变量
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
//开始一个新的Job进行计算,计算结果存入路径path中
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
//根据结果的路径path来创建CheckpointRDD
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
//保存结果,清除原始RDD的依赖、Partition信息等
RDDCheckpointData.synchronized {
cpFile = Some(path.toString)
cpRDD = Some(newRDD) // RDDCheckpointData对应的CheckpointRDD
rdd.markCheckpointed(newRDD) // 清除原始RDD的依赖,Partition
cpState = Checkpointed //标记checkpoint的状态为完成
}
至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系,那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?
答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的CheckpointRDD:
privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) { //没有checkpoint
dependencies_ = getDependencies
}
dependencies_
}
}
理解了Checkpoint的实现过程,接下来看一下computeOrReadCheckpoint的实现。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache. spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。实现如下:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)
: Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
firstParent[T].iterator(split,context)会调用对应CheckpointRDD的iterator,最终调用到它的compute:
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, broadcastedConf, context) //读取Checkpoint的数据
}
3.4.5 RDD的计算逻辑
RDD的计算逻辑在org.apache.spark.rdd.RDD#compute中实现。每个特定的RDD都会实现compute。比如前面提到的CheckpointRDD的compute就是直接读取checkpoint数据。HadoopRDD就是读取指定Partition的数据。MapPartitionsRDD就是将用户的转换逻辑作用到指定的Partition上。