阅读背景:

spark core源码分析10 Task的运行

来源:互联网 

这一节介绍具体task的运行和终究成果的处置

看线程运行的run办法,见代码注释

override def run(): Unit = {
    val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
    val deserializeStartTime = System.currentTimeMillis()
    Thread.currentThread.setContextClassLoader(replClassLoader)
    val ser = env.closureSerializer.newInstance()
    logInfo(s"Running $taskName (TID $taskId)")
    //这个就是就是向Driver发送StatusUpdate办法,状况是RUNNING,其实不做甚么操作的
    execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
    var taskStart: Long = 0
    startGCTime = computeTotalGcTime()

    try {
      //将serializedTask解析出来
      val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
      //下载运行task须要的jar,文件等
      updateDependencies(taskFiles, taskJars)
      //把真实的task反序列化出来
      task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
      task.setTaskMemoryManager(taskMemoryManager)

      // If this task has been killed before we deserialized it, let"s quit now. Otherwise,
      // continue executing the task.
      if (killed) {
        // Throw an exception rather than returning, because returning within a try{} block
        // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
        // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
        // for the task.
        throw new TaskKilledException
      }

      logDebug("Task " + taskId + ""s epoch is " + task.epoch)
      env.mapOutputTracker.updateEpoch(task.epoch)

      // Run the actual task and measure its runtime.
      taskStart = System.currentTimeMillis()
      val value = try {
	//义务履行,见下面解析
        task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
      } finally {
        // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
        // when changing this, make sure to update both copies.
        //释放内存
        val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
        if (freedMemory > 0) {
          val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
          if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
            throw new SparkException(errMsg)
          } else {
            logError(errMsg)
          }
        }
      }
      val taskFinish = System.currentTimeMillis()

      // If the task has been killed, let"s fail it.
      if (task.killed) {
        throw new TaskKilledException
      }

      val resultSer = env.serializer.newInstance()
      val beforeSerialization = System.currentTimeMillis()
      //将task运行成果序列化
      val valueBytes = resultSer.serialize(value)
      val afterSerialization = System.currentTimeMillis()

      for (m <- task.metrics) {
        // Deserialization happens in two parts: first, we deserialize a Task object, which
        // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
        m.setExecutorDeserializeTime(
          (taskStart - deserializeStartTime) + task.executorDeserializeTime)
        // We need to subtract Task.run()"s deserialization time to avoid double-counting
        m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
        m.setJvmGCTime(computeTotalGcTime() - startGCTime)
        m.setResultSerializationTime(afterSerialization - beforeSerialization)
      }

      val accumUpdates = Accumulators.values
      val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
      val serializedDirectResult = ser.serialize(directResult)
      val resultSize = serializedDirectResult.limit
      //这里将终究成果序列化成serializedDirectResult,并依据这个序列化以后的大小辨别处置

      // directSend = sending directly back to the driver
      val serializedResult: ByteBuffer = {
        //终究成果序列化以后>1G
        if (maxResultSize > 0 && resultSize > maxResultSize) {
          logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
            s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
            s"dropping it.")
          ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
        } 
        //终究成果序列化以后>10M,把序列化的成果作为一个Block寄存在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中
        else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val blockId = TaskResultBlockId(taskId)
          env.blockManager.putBytes(
            blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
          logInfo(
            s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
          ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
        } else {
          //小数据可以直接处置
          logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
          serializedDirectResult
        }
      }

      execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

    } catch {
      case ffe: FetchFailedException =>
        val reason = ffe.toTaskEndReason
        execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

      case _: TaskKilledException | _: InterruptedException if task.killed =>
        logInfo(s"Executor killed $taskName (TID $taskId)")
        execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))

      case cDE: CommitDeniedException =>
        val reason = cDE.toTaskEndReason
        execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

      case t: Throwable =>
        // Attempt to exit cleanly by informing the driver of our failure.
        // If anything goes wrong (or this was a fatal exception), we will delegate to
        // the default uncaught exception handler, which will terminate the Executor.
        logError(s"Exception in $taskName (TID $taskId)", t)

        val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
          task.metrics.map { m =>
            m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
            m.setJvmGCTime(computeTotalGcTime() - startGCTime)
            m
          }
        }
        val taskEndReason = new ExceptionFailure(t, metrics)
        execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(taskEndReason))

        // Don"t forcibly exit unless the exception was inherently fatal, to avoid
        // stopping other tasks unnecessarily.
        if (Utils.isFatalError(t)) {
          SparkUncaughtExceptionHandler.uncaughtException(t)
        }

    } finally {
      // Release memory used by this thread for shuffles
      env.shuffleMemoryManager.releaseMemoryForThisThread()
      // Release memory used by this thread for unrolling blocks
      env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
      // Release memory used by this thread for accumulators
      Accumulators.clear()
      runningTasks.remove(taskId)
    }
  }
}
override




你的当前访问异常,请进行认证后继续阅读剩余内容。

分享到: