
2021SC@SDUSC
目录
创建和启动DAGScheduler
创建和启动DAGScheduler
DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAGScheduler的代码如下:
@volatile private[spark] var dagScheduler: DAGScheduler = _
dagScheduler = new DAGScheduler(this)
DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,代码如下:
private[scheduler] val nextJobId = new AtomicInteger(O)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(O)
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
private val failedEpoch = new HashMap[String, Long]
private val dagSchedulerActorSupervisor =
env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
在构造DAGScheduler的时候会调用 initializeEventProcessActor 方法创建 DAGSchedulerEventProcessActor,代码如下:
private[scheduler] var eventProcessActor: ActorRef = _
private def initializeEventProcessActor () {
// blocking the thread until supervisor is started, which ensures eventProcessActor
//is
// not null before any job is submitted
implicit val timeout = Timeout(30 seconds)
val initEventActorReply =
dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
eventProcessActor = Await.result(initEventActorReply, timeout.duration).
asinstanceOf[ActorRef]
}
initializeEventProcessActor()
这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。见下面第一段代码,DAGSchedulerActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAGSchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册ActorSystem, ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了
dagScheduler,见下面第二段代码。从第二段代码我们还看到DAGSchedulerEventProcessActor 所能处理的悄息类型, 比如JobSubmitted、 BeginEvent、CompletionEvent等。DAGSchedulerEventProcessActor接受这些消息后会有不同的处理动作。
private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
extends Actor with Logging{
override val supervisorStrategy =
oneForOneStrategy() {
case x: Exception =>
logError("eventProcesserActor failed; shutting down SparkContext", x)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError ("DAGScheduler failed to cancel all
jobs.", t)
}
dagScheduler.sc.stop()
Stop
}
def receive = {
case p: Props => sender ! context.actorOf (p)
case_ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
}
}
private[scheduler] class DAGSchedulerEventProcessActor (dagScheduler: DAGScheduler)
extends Actor with Logging {
override def preStart() {
dagScheduler.taskScheduler.seLDAGScheduler(dagScheduler)
}
/ **
* The main event loop of the DAG scheduler.
*/
def receive = {
case JobSubmitted(jobld, rdd, func, partitions, allowLocal, callSite,
listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal,
callSite,listener, properties)
case StageCancelled(stageid) =>
dagScheduler.handleS七ageCancellation(stageId)
case JobCancelled(jobid) =>
dagScheduler.handleJobCancellation(jobld)
case JobGroupCancelled(groupid) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execid, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execld, fetchFailed = false)
case BeginEvent(task, tasklnfo) =>
dagScheduler.handleBeginEvent(task, taskinfo)
case GettingResultEvent(tasklnfo) =>
dagScheduler.handleGetTaskResult(taskinfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
override def postStop() {
//Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)