3.4 RDD的计算

3.4 RDD的计算

3.4.1 Ta s k简介

原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。计算节点执行计算逻辑的部分称为Executor。Executor在准备好Task的运行时环境后,会通过调用org.apache.spark.scheduler.Task#run来执行计算。Spark的Task分为两种:

1)org.apache.spark.scheduler.ShuffleMapTask

2)org.apache.spark.scheduler.ResultTask

简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行,执行过程的实现在org.apache. spark.executor.Executor.TaskRunner#run。第6章会介绍这一部分的实现原理和设计思想。

3.4.2 Task的执行起点

org.apache.spark.scheduler.Task#run会 调 用ShuffleMapTask或 者ResultTask的runTask;runTask会调用RDD的org.apache.spark.rdd.RDD#iterator。计算由此开始。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {

if(storageLevel != StorageLevel.NONE) {

//如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算

SparkEnv.get.cacheManager.getOrCompute(this, split, context,storageLevel)

} else {

//如果有checkpoint,那么直接读取结果;否则直接进行计算

computeOrReadCheckpoint(split, context)

}

}

其中,SparkEnv中包含了一个运行时节点所需要的所有的环境信息。cache-Manager是org.apache.spark.CacheManager,它负责调用BlockManager来管理RDD的缓存,如果当前RDD原来计算过并且把结果缓存起来,那么接下来的运算都可以通过BlockManager来直接读取缓存后返回。SparkEnv除了cacheManager,还包括以下重要的成员变量:

1)akka.actor.ActorSystem:运行在该节点的Actor System,其中运行在Driver上的名字是sparkDriver;运行在Executor上的是sparkExecutor。

2)org.apache.spark.serializer.Serializer:序列化和发序列化的工具。

3)org.apache.spark.MapOutputTracker;保存Shuffle Map Task输出的位置信息。其中在Driver上的Tracer是org.apache.spark.MapOutputTrackerMaster;而在Executor上的Tracker是org.apache.spark.MapOutputTrackerWorker,它会从org.apache.spark. MapOutputTrackerMaster获取信息。

4)org.apache.spark.shuffle.ShuffleManager:Shuffle的管理者,其中Driver端会注册Shuffle的信息,而Executor端会上报和获取Shuffle的信息。现阶段内置支持Hash Based Shuffle和Sort Based Shuffle,具体实现细节请参阅第7章。

5)org.apache.spark.broadcast.BroadcastManager:广播变量的管理者。

6)org.apache.spark.network.BlockTransferService:Executor读取Shuffle数据的Client。当前支持netty和nio,可以通过spark.shuffle.blockTransferService来设置。具体详情可以参阅第7章。

7)org.apache.spark.storage.BlockManager:提供了Storage模块与其他模块的交互接口,管理Storage模块。

8)org.apache.spark.SecurityManager:Spark对于认证授权的实现。

9)org.apache.spark.HttpFileServer:可以提供HTTP服务的Server。当前主要用于Executor端下载依赖。

10)org.apache.spark.metrics.MetricsSystem:用于搜集统计信息。

11)org.apache.spark.shuffle.ShuffleMemoryManager:管理Shuffle过程中使用的内存。ExternalAppendOnlyMap 和ExternalSorter都会从ShuffleMemoryManager中申请内存,在数据spill到Disk后会释放内存。当然了,当Task退出时这个内存也会被回收。为了使得每个thread都会比较公平地获取内存资源,避免一个thread申请了大量内存后造成其他的thread需要频繁地进行spill操作,它采取的内存分配策略是:对于N个thread,每个thread可以至少申请1/(2*N)的内存,但是至多申请1/N。这个N是动态变化的,感兴趣的读者可以查阅这个类的具体实现。

在用户创建org.apache.spark.SparkContext时会创建org.apache.spark.SparkEnv。

3.4.3 缓存的处理

如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。什么是存储级别?从用户的角度来看就是缓存保存到不同的存储位置,比如内存、硬盘、Tachyon;还有缓存的数据是否需要序列化等。详细的存储级别的介绍可以参阅第8章。

cacheManager对Storage模块进行了封装,使得RDD可以更加简单地从Storage模块读取或者写入数据。RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。在系统实现的层面上,可以认为Partition和Block是一一对应的。cacheManager会通过getOrCompute来判断当前的RDD是否需要进行计算。

首先,cacheManager会通过RDD的ID和当前计算的Partition的ID向Storage模块的BlockManager发起查询请求,如果能够获得Block的信息,会直接返回Block的信息。否则,代表该RDD是需要计算的。这个RDD以前可能计算过并且被存储到了内存中,但是后来由于内存紧张,这部分内存被清理了。在计算结束后,计算结果会根据用户定义的存储级别,写入BlockManager中。这样,下次就可以不经过计算而直接读取该RDD的计算结果了。核心实现逻辑如下:

def getOrCompute[T](

rdd: RDD[T],

partition: Partition,

context: TaskContext,

storageLevel: StorageLevel): Iterator[T] = {

//获取RDD的BlockId

val key = RDDBlockId(rdd.id, partition.index)

logDebug(s"Looking for partition $key")

blockManager.get(key) match { //向BlockManager查询是否有缓存

case Some(blockResult) => //缓存命中

//更新统计信息,将缓存作为结果返回

context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None => //没有缓存命中,需要计算

// 判断当前是否有线程在处理当前的Partition,如果有那么等待它结束后,直接从Block

// Manager中读取处理结果如果没有线程在计算,那么storedValues就是None,否则

// 就是计算的结果

val storedValues = acquireLockForPartition[T](key)

if (storedValues.isDefined) { // 已经被其他线程处理了,直接返回结果

return new InterruptibleIterator[T](context, storedValues.get)

}

// 需要计算

try {

// 如果被checkpoint过,那么读取checkpoint的数据;否则调用rdd的compute()开始

  // 计算

        val computedValues = rdd.computeOrReadCheckpoint(partition,context)

// Task是在Driver端执行的话就不需要缓存结果,这个主要是为了first() 或者take()

// 这种仅仅有一个执行阶段的任务的快速执行。这类任务由于没有Shuffle阶段,直接运行

// 在Driver端可能会更省时间

if (context.isRunningLocally) {

return computedValues

}

// 将计算结果写入到BlockManager

val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

val cachedValues =

putInBlockManager(key, computedValues, storageLevel, updatedBlocks)

// 更新任务的统计信息

val metrics = context.taskMetrics

val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(

Seq[(BlockId, BlockStatus)]())

metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)

new InterruptibleIterator(context, cachedValues)

} finally {

loading.synchronized {

loading.remove(key)

// 如果有其他的线程在等待该Partition的处理结果,那么通知它们计算已经完成,结果已

// 经存到BlockManager中(注意前面那类不会写入BlockManager的本地任务)

// loading.notifyAll()

}

}

}

}

3.4.4 checkpoint的处理

在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。

首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache. spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。简要的核心逻辑如下:

// 创建一个保存checkpoint数据的目录

val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)

val fs = path.getFileSystem(rdd.context.hadoopConfiguration)

if (!fs.mkdirs(path)) {

throw new SparkException("Failed to create checkpoint path " + path)

}

// 创建广播变量

val broadcastedConf = rdd.context.broadcast(

new SerializableWritable(rdd.context.hadoopConfiguration))

//开始一个新的Job进行计算,计算结果存入路径path中

rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)

//根据结果的路径path来创建CheckpointRDD

val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

//保存结果,清除原始RDD的依赖、Partition信息等

RDDCheckpointData.synchronized {

cpFile = Some(path.toString)

cpRDD = Some(newRDD) // RDDCheckpointData对应的CheckpointRDD

rdd.markCheckpointed(newRDD)      // 清除原始RDD的依赖,Partition

cpState = Checkpointed            //标记checkpoint的状态为完成

}

至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系,那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?

答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的CheckpointRDD:

privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)

final def dependencies: Seq[Dependency[_]] = {

checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {

if (dependencies_ == null) { //没有checkpoint

dependencies_ = getDependencies

}

dependencies_

}

}

理解了Checkpoint的实现过程,接下来看一下computeOrReadCheckpoint的实现。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache. spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。实现如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)

: Iterator[T] =

{

if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)

}

firstParent[T].iterator(split,context)会调用对应CheckpointRDD的iterator,最终调用到它的compute:

override def compute(split: Partition, context: TaskContext): Iterator[T] = {

val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))

CheckpointRDD.readFromFile(file, broadcastedConf, context) //读取Checkpoint的数据

}

3.4.5 RDD的计算逻辑

RDD的计算逻辑在org.apache.spark.rdd.RDD#compute中实现。每个特定的RDD都会实现compute。比如前面提到的CheckpointRDD的compute就是直接读取checkpoint数据。HadoopRDD就是读取指定Partition的数据。MapPartitionsRDD就是将用户的转换逻辑作用到指定的Partition上。