Spark (2)Spark内核

Spark内核:组件(Driver、Executor)通信、Job的调度(Stage调度、Task调度)

Yarn环境

  • Yarn Client

    • 过程:

      • 执行脚本提交任务(spark-submit),实际是启动一个 SparkSubmit 的 JVM 进程
      • SparkSubmit 类中的 main 方法反射调用用户代码的 main 方法
      • 启动 Driver 线程,执行用户的作业,创建 ScheduleBackend
      • YarnClientSchedulerBackend 向 RM 发送指令:bin/java ExecutorLauncher
      • Yarn 框架收到指令后在指定的 NM 中启动 ExecutorLauncher(还是调用 ApplicationMaster 的 main 方法)
      • AM 向 RM 注册,申请资源
      • 获取资源后 AM 向 NM 发送指令:bin/java CoarseGrainedExecutorBackend
      • CoarseGrainedExecutorBackend 进程接收消息,跟 Driver 通信,注册已经启动的 Executor;启动计算对象 Executor 等待接收任务
      • Driver 分配任务并监控任务的执行
    • SparkSubmit、ApplicationMaster 和 CoarseGrainedExecutorBackend 是独立的进程,Executor 和 Driver 是对象

      image-20220808190003976
  • Yarn Cluster模式

    • 过程:

      • 执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程;
      • SparkSubmit 类中的 main 方法反射调用 YarnClusterApplication 的 main 方法
      • YarnClusterApplication 创建 Yarn 客户端,向 Yarn 服务器发送执行指令:bin/java ApplicationMaster
      • Yarn 框架收到指令后在指定的 NM 中启动 ApplicationMaster
      • AM 启动 Driver 线程,执行用户的作业
      • AM 向 RM 注册,申请资源
      • 获取资源后 AM 向 NM 发送指令:bin/java YarnCoarseGrainedExecutorBackend;
      • CoarseGrainedExecutorBackend 进程接收消息,跟 Driver 通信,注册已经启动的 Executor;启动计算对象 Executor 等待接收任务
      • Driver 线程继续执行,完成作业的调度和任务的执行
      • Driver 分配任务并监控任务的执行
    • SparkSubmit、ApplicationMaster 和 YarnCoarseGrainedExecutorBackend 是独立的进程;Driver 是独立的线程;Executor 和 YarnClusterApplication 是对象

      image-20220808184629202
  • 两个模式的区别在于driver的运行位置!

组件通信

  • Netty通信框架,AIO

    • AIO:异步非阻塞式IO,客户端在请求数据的过程中,不用保持一个连接,可以做其他事情。一个通知机制,客户端请求数据。服务端若有则返回数据,若无则客户端断开连接做其他事情,服务端有数据后主动通知客户端并返回数据
    • BIO:阻塞式IO,客户端在请求数据的过程中,保持一个连接,不能做其他事情。对于客户端和服务端而言,都需要一个线程来维护这个连接,如果服务端没有数据给客户端,则客户端需要一直等待
    • NIO:非阻塞式IO,客户端在请求数据的过程中,不用保持一个连接,不能做其他事情。客户端建立连接请求数据,服务端没有数据时客户端断开连接,一段时间后重新建立连接询问
    • 同步、异步:客户端在请求数据的过程中,能否做其他事情;阻塞、非阻塞:客户端与服务端是否始终有一个持续连接,占用通道
  • Linux采用Epoll方式模拟AIO

  • 通信环境:

    image-20220809123449398
  • 框架中各个组件(Client/Master/Worker)可以认为是独立的实体,各实体通过消息通信,Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N取决于该Endpoint与多少其他Endpoint通信),接收到的消息写入InBox,发送的消息写入OutBox并发到其他Endpoint的InBox

    image-20220809125238985
  • 终端driver的类:class DriverEndpoint extends IsolatedRpcEndpoint;executor的类:class CoarseGrainedExecutorBackend extends IsolatedRpcEndpoint

    image-20220809123545215
  • NettyRpcEnv:RPC 上下文环境,每个RPC终端运行时依赖的上下文环境

    • RpcEndpoint:RPC终端

      • 每个节点(Client/Master/Worker)都称为一个RPC终端,实现RpcEndpoint接口
      • 根据不同端点的需求,设计不同的消息和不同的业务处理
      • 需要发送(询问)则调用Dispatcher
    • Dispatcher:消息调度(分发)器,将RPC终端发送、接收的远程消息分发至对应的指令收件箱(发件箱)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      // class NettyRpcEnv
      private[netty] def send(message: RequestMessage): Unit = {
      // 获取接收者地址信息
      val remoteAddr = message.receiver.address
      if (remoteAddr == address) {
      // Message to a local RPC endpoint.
      // 把消息发送到本地的 RPC 端点 (发送到收件箱)
      try {
      dispatcher.postOneWayMessage(message)
      } catch {
      case e: RpcEnvStoppedException => logWarning(e.getMessage)
      }
      } else {
      // Message to a remote RPC endpoint.
      // 把消息发送到远程的 RPC 端点. (发送到发件箱)
      postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
      }
      }
    • Inbox:指令消息收件箱

      • 一个RpcEndpoint对应一个收件箱
      • Dispatcher向Inbox存入消息时,将对应EndpointData加入内部ReceiverQueue
      • Dispatcher创建时会启动一个线程轮询ReceiverQueue,进行收件箱消息消费
    • RpcEndpointRef:对远程RpcEndpoint的一个引用。当需要向一个具体的RpcEndpoint发送消息时,需要获取该RpcEndpoint的引用,通过该引用发送消息

    • OutBox:指令消息发件箱

      • 一个目标RpcEndpoint对应一 个发件箱
      • 消息放入Outbox后,通过TransportClient发送消息,消息放入发件箱和发送在同一个线程中进行
    • RpcAddress:RpcEndpointRef的地址,Host + Port

    • TransportClient:Netty通信客户端

      • 一个OutBox对应一个TransportClient
      • TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer
    • TransportServer:Netty 通信服务端

      • 一个RpcEndpoint对应一个TransportServer
      • 接收远程消息后调用Dispatcher分发消息至收发件箱

Spark任务调度

  • Yarn-Cluster模式下,任务提交,启动Driver线程后,Driver初始化SparkContext对象,准备运行的上下文,保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,同时根据用户业务逻辑调度任务,将任务下发到已有的空闲Executor。ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster尝试在对应的Container启动Executor进程,启动后Executor向Driver反向注册,保持与Driver的心跳,等待Driver分发任务,执行完分发任务会将任务状态上报给Driver

  • 一个Spark任务包括:

    • Job:以Action算子为界,遇到一个Action算子则触发一个Job

    • Stage:Job的子集,以RDD宽依赖(Shuffle)为界

    • Task:Stage的子集,以并行度(分区数)衡量,分区数多少则有多少个task

      image-20220809133132811

  • Spark RDD通过转换算子形成了RDD血缘(依赖)关系图DAG,通过Action的调用,触发Job并调度执行,执行中创建两个调度器:DAGScheduler、TaskScheduler

    • DAGScheduler:负责Stage级调度,将job切分成若干Stages,将每个Stage打包成TaskSet交给TaskScheduler调度
    • TaskScheduler:负责Task级调度,将TaskSet按指定的调度策略分发到Executor执行
    • 调度过程中SchedulerBackend提供可用资源,SchedulerBackend有多种实现,对应不同资源管理系统
  • Driver初始化SparkContext时:

    • 初始化DAGScheduler、TaskScheduler、SchedulerBackend(通信后台)、HeartbeatReceiver、SparkConf(配置对象)、SparkEnv(通信环境),启动SchedulerBackend、HeartbeatReceiver
    • SchedulerBackend通过ApplicationMaster申请资源,从TaskScheduler拿到合适的Task分发到Executor执行
    • HeartbeatReceiver负责接收Executor心跳信息,监控Executor存活状况,通知TaskScheduler

image-20220809133916210

Stage级调度

  • Job提交的方法调用流程:

    image-20220809134913443
  • Job由最终的RDD和Action算子封装成

  • SparkContext将Job交给DAGScheduler,DAGScheduler根据RDD的血缘关系构成的DAG划分多个Stage:

    • 由最终RDD不断通过依赖回溯判断父依赖是否是宽依赖,窄依赖的RDD之间划分到同一个Stage

    • 划分的Stages分两类: ResultStage(DAG最下游的Stage,由Action方法决定),ShuffleMapStage(为下游Stage准备数据)

      image-20220809140204033
    • 一个Stage是否被提交,需要判断父Stage是否执行,父Stage执行完毕才能提交当前Stage。Stage提交时会将Task信息(分区信息、算子等)序列化并打包成TaskSet交给TaskScheduler,一个Partition对应一个Task

    • TaskScheduler监控Stage的运行状态,只有Executor丢失或Task由于Fetch失败才重新提交失败的Stage,其他类型Task失败在TaskScheduler的调度中重试

Stage级调度的源码

  • SparkContext初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 用来与其他组件通讯用
    private var _schedulerBackend: SchedulerBackend = _
    // DAG 调度器, 是调度系统的中的中的重要组件之一, 负责创建 Job, 将 DAG 中的 RDD 划分到不同的 Stage, 提交 Stage 等.
    // SparkUI 中有关 Job 和 Stage 的监控数据都来自 DAGScheduler
    @volatile private var _dagScheduler: DAGScheduler = _
    // TaskScheduler 按照调度算法对集群管理器已经分配给应用程序的资源进行二次调度后分配给任务
    // TaskScheduler 调度的 Task 是由DAGScheduler 创建的, 所以 DAGScheduler 是 TaskScheduler的前置调度器
    private var _taskScheduler: TaskScheduler = _

    // 创建 SchedulerBackend 和 TaskScheduler
    val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    // 创建 DAGScheduler
    _dagScheduler = new DAGScheduler(this)
    // 启动 TaskScheduler, 内部会也会启动 SchedulerBackend
    _taskScheduler.start()
  • Action算子的runJob方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
    }

    def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
    // 作业的切分
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    }
  • DAGScheduler类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
    // 提交任务 返回值 JobWaiter 用来确定 Job 的成功与失败
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    }


    def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
    // 创建 JobWaiter 对象
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    // 向内部事件循环器发送消息 JobSubmitted
    eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
    waiter
    }

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    // 处理提交的 Job
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    }

    private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {

    var finalStage: ResultStage = null
    try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    // Stage 的划分是从后向前推断的, 所以先创建最后的阶段
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {

    }

    submitStage(finalStage)
    }

    private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
    // 1. 获取所有父 Stage 的列表
    val parents: List[Stage] = getOrCreateParentStages(rdd, jobId)
    // 2. 给 resultStage 生成一个 id
    val id = nextStageId.getAndIncrement()
    // 3. 创建 ResultStage
    val stage: ResultStage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    // 4. stageId 和 ResultStage 做映射
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
    }

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    // 获取所有的 Shuffle 依赖(宽依赖)
    getShuffleDependencies(rdd).map { shuffleDep =>
    // 对每个 shuffle 依赖, 获取或者创建新的 Stage: ShuffleMapStage
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }

    // 得到所有宽依赖
    private[scheduler] def getShuffleDependencies(
    rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    if (!visited(toVisit)) {
    visited += toVisit
    toVisit.dependencies.foreach {
    case shuffleDep: ShuffleDependency[_, _, _] =>
    parents += shuffleDep
    case dependency =>
    waitingForVisit.push(dependency.rdd)
    }
    }
    }
    parents
    }

    private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {

    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    // 获取所有的父 Stage
    val missing = getMissingParentStages(stage).sortBy(_.id)
    // 如果为空, 则提交这个 Stage
    if (missing.isEmpty) {
    submitMissingTasks(stage, jobId.get)
    } else { // 如果还有父 Stage , 则递归调用
    for (parent <- missing) {
    submitStage(parent)
    }
    waitingStages += stage
    }
    }
    } else {

    }
    }

    private def submitMissingTasks(stage: Stage, jobId: Int) {
    // 任务划分. 每个分区创建一个 Task
    val tasks: Seq[Task[_]] = try {
    stage match {
    case stage: ShuffleMapStage =>
    partitionsToCompute.map { id =>
    val locs = taskIdToLocations(id)
    val part = stage.rdd.partitions(id)
    new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
    taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
    Option(sc.applicationId), sc.applicationAttemptId)
    }

    case stage: ResultStage =>
    partitionsToCompute.map { id =>
    val p: Int = stage.partitions(id)
    val part = stage.rdd.partitions(p)
    val locs = taskIdToLocations(id)
    new ResultTask(stage.id, stage.latestInfo.attemptId,
    taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
    Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
    }
    }
    } catch {

    }
    // 提交任务
    if (tasks.size > 0) {

    // 使用 taskScheduler 提交任务
    taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    } else {

    }
    }
  • TaskScheduler类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks

    this.synchronized {
    // 创建 TaskManger 对象. 用来追踪每个任务
    val manager: TaskSetManager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId

    // manage 和 TaskSet 交给合适的任务调度器来调度
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    }
    // 跟 ExecutorBackend 通讯
    backend.reviveOffers()
    }

  • CoarseGrainedSchedulerBackend

    1
    2
    3
    4
    override def reviveOffers() {
    // DriverEndpoint 给自己发信息: ReviveOffers
    driverEndpoint.send(ReviveOffers)
    }
  • DriverEndpoint

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    private def makeOffers() {
    // 过滤出 Active 的Executor
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    // 封装资源
    val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    // 启动任务
    launchTasks(scheduler.resourceOffers(workOffers))
    }

    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
    for (task <- tasks.flatten) {
    // 序列化任务
    val serializedTask = ser.serialize(task)
    if (serializedTask.limit >= maxRpcMessageSize) {

    }
    else {

    val executorData = executorDataMap(task.executorId)
    executorData.freeCores -= scheduler.CPUS_PER_TASK

    // 发送任务到 Executor. CoarseGrainedExecutorBackend 会收到消息
    executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
    }
    }
  • CoarseGrainedExecutorBackend

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    override def receive: PartialFunction[Any, Unit] = {
    //
    case LaunchTask(data) =>
    if (executor == null) {
    exitExecutor(1, "Received LaunchTask command but executor was null")
    } else {
    // 把要执行的任务反序列化
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    // 启动任务开始执行
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
    taskDesc.name, taskDesc.serializedTask)
    }
    }

Task级调度

  • Task的调度由TaskScheduler完成,TaskScheduler将TaskSet封装为TaskSetManager,加入调度队列——TaskSetManager监控管理同一个Stage中的Tasks,TaskScheduler以TaskSetManager为单元调度任务

    image-20220809142905732

  • Driver初始化TaskScheduler后,会启动SchedulerBackend,接收Executor注册信息、维护Executor状态、定期询问TaskScheduler是否有任务运行

  • TaskScheduler在SchedulerBackend询问时,从调度队列按照指定的调度策略选择TaskSetManager去调度运行

    image-20220809143144045
    • 将TaskSetManager加入rootPool调度池后,调用SchedulerBackend.reviveOffers给driverEndpoint发送ReviveOffer消息
    • driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤得到活跃状态的Executor,将Executor封装成WorkerOffer对象
    • 准备好计算资源 (WorkerOffer)后,taskScheduler基于计算资源调用resourceOffer在Executor上分配task

调度策略

  • TaskScheduler支持两种调度策略:FIFO(默认的调度策略)、FAIR

  • TaskScheduler初始化中会实例化rootPool(树的根节点,Pool类型)

  • FIFO 调度策略:TaskSetManager按照先来先到的方式入队,树结构如下所示

    image-20220809145426237
  • FAIR 调度策略:树结构如下所示

    image-20220809151025413
    • 一个rootPool和多个子Pool,子Pool存储待分配的TaskSetMagager

    • 先对子Pool排序,再对子Pool中的TaskSetMagager排序(排序算法相同)

      • 排序过程的比较基于Fair-share,每个排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时综合考量runningTasks 值,minShare值、weight值(minShare使用率和权重使用率少——实际运行 task比例较少的先运行)

      • minShare、weight的值均在公平调度配置文件fairscheduler.xml指定,调度池在构建阶段读取此文件的相关配置

        1. 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare, B排在A前面(runningTasks比minShare小的先执行)

        2. A、B 对象的runningTasks都小于各自的minShare,比较runningTasks与minShare的比值(minShare使用率),minShare使用率低的先执行

        3. A、B 对象的runningTasks都大于各自的minShare,比较runningTasks与weight的比值(权重使用率),权重使用率低的先执行

        4. 如果上述比较均相等,则比较名字

    • 排序完成后,所有的TaskSetManager放入一个ArrayBuffer,依次被取出并发送给Executor执行

  • 从调度队列中拿到TaskSetManager后,TaskSetManager按照一定的规则取出Task给TaskScheduler,TaskScheduler将Task交给SchedulerBackend发到Executor执行

失败重试与黑名单机制

  • Task提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager并通知,从而TaskSetManager知道Task的失败与成功状态
  • 对于失败的Task,TaskSetManager记录失败的次数,如果次数没有超过最大重试次数,则放回待调度的Task池中,否则整个Application失败
  • 记录Task失败次数过程,会记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,下次再调度该Task时,使用黑名单机制,避免它被调度到上一次失败的节点

Task级调度源码

  • FIFO调度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
    // 是不是先调度 s1
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
    val stageId1 = s1.stageId
    val stageId2 = s2.stageId
    res = math.signum(stageId1 - stageId2)
    }
    res < 0 // 值小的先调度
    }
    }
  • Fair调度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) { // 谁的 runningTasks1 < minShare1 谁先被调度
    return true
    } else if (!s1Needy && s2Needy) {
    return false
    } else if (s1Needy && s2Needy) { // 如果都 runningTasks < minShare
    // 则比较 runningTasks / math.max(minShare1, 1.0) 的比值 小的优先级高
    compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
    // 如果都runningTasks > minShare, 则比较 runningTasks / weight 的比值
    // 小的优先级高
    compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
    true
    } else if (compare > 0) {
    false
    } else {
    // 如果前面都一样, 则比较 TaskSetManager 或 Pool 的名字
    s1.name < s2.name
    }
    }
    }