版权声明:本文为原创文章,未经允许不得转载。
复习内容: Spark中Job如何划分为Stage
1.Spark中Stage的提交
1.在复习内容中,将Job划分为Stage这一过程的调用起始于方法handleJobSubmitted,同样Stage的提交也包含在该方法中,如下所示:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { //(1)根据jobId生成finalStage,详见文章-Spark中Job如何划分为Stage //(2)Job的提交,详见文章-Spark中Job的提交 //(3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中 //(4)提交Taskset(tasks) //提交stage,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中,详见2 submitStage(finalStage) //check for 正在等待或失败的stages ,他们会重新提交 submitWaitingStages() }2.submitStage方法如下所示,根据finalStage循环调用submitStage方法进行Stages的提交,
//根据Stage找到jobId
val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) //如果没有丢失,那么就提交Stage if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get)详见3 } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) }3.submitMissingTasks方法如下所示,该方法中包括Stage、TaskSet的提交,TaskSet(tasks)的提交请看文章-Spark中TaskSet(Tasks)的提交
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingPartitions.clear() //首先找到根据ShuffleMapStage和ResultStage两种类型来找到它们对应的分区的索引ids val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = { stage match { case stage: ShuffleMapStage => val allPartitions = 0 until stage.numPartitions val filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty } (allPartitions, filteredPartitions) case stage: ResultStage => val job = stage.resultOfJob.get val allPartitions = 0 until job.numPartitions val filteredPartitions = allPartitions.filter { id => !job.finished(id) } (allPartitions, filteredPartitions) } } //创建一个内部计算器,如果stage没有accumulator被初始化,那么重置内部的accumulator if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) { stage.resetInternalAccumulators() } //得到Job的属性 val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull runningStages += stage //SparkListenerStageSubmitted应用在测试task是否被serializable后 然后发送出去 //如果task没有序列化,SparkListenerStageCompleted事件将不会发送出去,它总是在SparkListenerStageSubmitted事件之后 outputCommitCoordinator.stageStart(stage.id) //得到RDD得到它的分区的位置 val taskIdToLocations = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => val job = s.resultOfJob.get partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) //给JobProgressListener发送SparkListenerStageSubmitted事件 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))4.SparkListenerStageSubmitted前面我们提到,Job的启动是通过JobProgressListener的onJobStart方法执行的,同样,Stage的提交是通过,对应的事件类型是SparkListenerStageSubmitted,详见下:
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME) }.getOrElse(SparkUI.DEFAULT_POOL_NAME) stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) stageData.schedulingPool = poolName stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) stages(stage.stageId) = stage for ( activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); jobId <- activeJobsDependentOnStage; jobData <- jobIdToData.get(jobId) ) { jobData.numActiveStages += 1 // If a stage retries again, it should be removed from completedStageIndices set jobData.completedStageIndices.remove(stage.stageId) } }这样,我们就完成了Stage的提交,下一篇看Task的提交。