这一节介绍具体task的运行和终究成果的处置
看线程运行的run办法,见代码注释
override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") //这个就是就是向Driver发送StatusUpdate办法,状况是RUNNING,其实不做甚么操作的 execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { //将serializedTask解析出来 val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) //下载运行task须要的jar,文件等 updateDependencies(taskFiles, taskJars) //把真实的task反序列化出来 task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) task.setTaskMemoryManager(taskMemoryManager) // If this task has been killed before we deserialized it, let"s quit now. Otherwise, // continue executing the task. if (killed) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw new TaskKilledException } logDebug("Task " + taskId + ""s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() val value = try { //义务履行,见下面解析 task.run(taskAttemptId = taskId, attemptNumber = attemptNumber) } finally { // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread; // when changing this, make sure to update both copies. //释放内存 val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { throw new SparkException(errMsg) } else { logError(errMsg) } } } val taskFinish = System.currentTimeMillis() // If the task has been killed, let"s fail it. if (task.killed) { throw new TaskKilledException } val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() //将task运行成果序列化 val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. m.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) // We need to subtract Task.run()"s deserialization time to avoid double-counting m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) } val accumUpdates = Accumulators.values val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit //这里将终究成果序列化成serializedDirectResult,并依据这个序列化以后的大小辨别处置 // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { //终究成果序列化以后>1G if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } //终究成果序列化以后>10M,把序列化的成果作为一个Block寄存在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中 else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { //小数据可以直接处置 logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case ffe: FetchFailedException => val reason = ffe.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case _: TaskKilledException | _: InterruptedException if task.killed => logInfo(s"Executor killed $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) case cDE: CommitDeniedException => val reason = cDE.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case t: Throwable => // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. logError(s"Exception in $taskName (TID $taskId)", t) val metrics: Option[TaskMetrics] = Option(task).flatMap { task => task.metrics.map { m => m.setExecutorRunTime(System.currentTimeMillis() - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m } } val taskEndReason = new ExceptionFailure(t, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(taskEndReason)) // Don"t forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. if (Utils.isFatalError(t)) { SparkUncaughtExceptionHandler.uncaughtException(t) } } finally { // Release memory used by this thread for shuffles env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() // Release memory used by this thread for accumulators Accumulators.clear() runningTasks.remove(taskId) } } } override