前言
当前团队Flink
集群使用的版本是1.7.2
,采用per-job on yarn
的运行模式,在近一年多的使用过程中碰到过多次内存相关的问题,比如:beyond the 'PHYSICAL' memory limit... Killing container.
,总是感觉Flink Streaming
在实际场景中的内存管理不够完美,会遇到各样的问题。在Flink 1.10
版本 release 后,了解到该版本对TaskExecutor
的内存配置做了重新设计,内心有想要去了解的冲动,然而看过社区文档后又有了更多的疑问,比如:TaskExecutor
对应的 JVM 进程在启动时只会有-Xmx -Xms -XX:MaxDirectMemorySize
三个内存相关参数是通过Flink
计算得出的,新增的细粒度配置能给JVM
这三个启动参数带来多少变化,或是只是一个方便内存计算的工具,对于对Flink
内存较为了解的人来讲,通过旧的内存配置参数可以完成与新配置一样的效果。
起初这篇文章计划写Flink Streaming
新/旧内存管理对比相关的内容,然而之后一段时间的业余精力被学习Rust
消耗掉啦,6 月底才算有时间开篇;之前在阅读内存管理代码同时参杂读了些任务启动相关代码,所以就扩展下之前计划写的文章范围:以描述Flink Streaming
整个启动流程为主,辅以内存分配/管理相关代码分析,阅读的Flink
代码版本定为最新版本1.11.0
。
启动流程分析
启动流程: 从脚本提交到任务启动成功(Flink-Web-Dashboard
展示的任务状态为running
)的整个流程,这个流程大致分为 3 个Stage
:
Client
端:封装信息提交给Yarn
,注册回调函数,轮训Job
状态;Yarn
分配Container
启动AppMaster
;AppMaster
向Yarn
申请资源启动TaskManager
;
脚本示例:
// Per-job model
flink run -m yarn-cluster -yn 24 -ys 2 -ytm 6g -ynm $job_name -c $main_class -d -yq ./$job_jar $params
StreamGraph:
graph LR
A(KafkaSource) --> B(MapOperator)
B --> C(KafkaSink)
[Client] 初始化 Job
// Client源码调用流程
[1] -> CliFrontend::main(String[] args)
-> -> CliFrontend.parseParameters
-> -> -> CliFrontend.run
[2] -> UserMainClass::main(args: Array[String])
-> -> StreamExecutionEnvironment.getExecutionEnvironment
-> -> StreamExecutionEnvironment.addSource
-> -> DataStream.map
-> -> DataStream.addSink
[3] -> -> StreamExecutionEnvironment.execute
-> -> -> StreamExecutionEnvironment.getStreamGraph
-> -> -> -> StreamExecutionEnvironment.getStreamGraphGenerator
-> -> -> -> StreamGraphGenerator.generate
-> -> -> StreamExecutionEnvironment.execute(StreamGraph)
-> -> -> -> StreamExecutionEnvironment.executeAsync
-> -> -> -> -> DefaultExecutorServiceLoader.getExecutorFactory
-> -> -> -> -> YarnJobClusterExecutorFactory.getExecutor
-> -> -> -> -> YarnJobClusterExecutor.execute
-> -> -> -> -> -> PipelineExecutorUtils::getJobGraph
-> -> -> -> -> -> YarnClusterClientFactory.createClusterDescriptor
[4] -> -> -> -> -> -> YarnClusterDescriptor.deployJobCluster
-> -> -> -> -> -> -> YarnClusterDescriptor.deployInternal
-> -> -> -> -> -> -> -> YarnClusterDescriptor.startAppMaster
-> -> -> -> -> -> -> -> -> YarnClientImpl.submitApplication
-> -> -> -> -> -> return CompletableFuture(new ClusterClientJobClientAdapter)
-> -> -> -> -> [action] get `JobClient`, invoke `JobListener`, return `JobClient`
-> -> -> -> [action] create `JobExecutionResult`, invoke `JobListener`, return `JobExecutionResult`
1. 任务提交脚本会触发调用org.apache.flink.client.cli.CliFrontend::main(String[] args)
,然后依次会执行:
- 调用
EnvironmentInformation::logEnvironmentInfo
加载 JVM 上线文环境; - 调用
GlobalConfiguration::loadConfiguration
加载flink-conf.yaml
配置信息; - 调用
CliFrontend.loadCustomCommandLines
返回由GenericCLI
,FlinkYarnSessionCli
,DefaultCLI
组成的List<CustomCommandLine>
对象;FlinkYarnSessionCli
中包含与yarn-cluster
提交模式相关,可以通过Command
命令提交的参数列表,如:-yid, -ynm, -yqu
等等; - 调用
CliFrontend.new
创建CliFrontend
对象,利用[3]
中的List<CustomCommandLine>
获取有效可用于解析Command
参数的Options
列表,附值给成员变量customCommandLineOptions
; - 调用
CliFrontend.parseParameters
,匹配Command
第一个参数run
然后调用后序函数; -
调用
CliFrontend.run
:- 解析
Command
参数并封装为CommandLine
对象; - 创建
ProgramOptions
对象,是Job Command
参数的封装类,持有CommandLine
解析得来的参数; - 将
ProgramOptions
传入PackagedProgram
的构造函数,创建PackagedProgram
对象,PackagedProgram
负责具体具体调用UserJar
的Main
函数; - 执行
CliFrontend.executeProgram
为任务执行上下文创建ExecutionEnvironmentFactory
和StreamExecutionEnvironmentFactory
对象,然后调用PackagedProgram.invokeInteractiveModeForExecution
方法反射调用UserJar
的main
函数,执行具体任务逻辑;
- 解析
2. UserJar
的任务入口函数UserMainClass::main(args: Array[String])
被调用后,会依次执行:
- 调用
StreamExecutionEnvironment::getExecutionEnvironment
创建StreamExecutionEnvironment
和StreamContextEnvironment
对象; - 调用
StreamExecutionEnvironment.addSource
创建DataStreamSource
对象,DataStreamSource
对象内持有上下文环境中的StreamExecutionEnvironment
和StreamTransformation
对象,StreamTransformation
中持有FlinkKafkaConsumer
对象; - 调用
DataStream.map
创建OneInputTransformation
对象,其内部持有上游的StreamTransformation
和上线文中的StreamExecutionEnvironment
对象;最后将OneInputTransformation
添加到StreamExecutionEnvironment
的成员变量transformations
列表中; - 调用
DataStream.addSink
创建DataStreamSink
对象,并将其添加到StreamExecutionEnvironment
的成员变量transformations
列表中; - 最后调用
StreamExecutionEnvironment.execute
开始执行Job
创建和任务提交;
3. StreamExecutionEnvironment.execute
在Client
端的代码执行流程:
-
调用
StreamExecutionEnvironment.getStreamGraph
,先创建StreamGraphGenerator
对象,然后调用StreamGraphGenerator.generate
生成StreamGraph
,生成StreamGraph
流程如下:- 根据
Job
和StreamExecutionEnvironment
的配置以及上下文信息创建StreamGraph
对象; - 遍历
StreamExecutionEnvironment.transformations
,对每个StreamTransformation
进行解析; - 从
StreamTransformation
构建出StreamNode
并存放到StreamGraph
对象的成员变量Map<Integer, StreamNode> streamNodes
中,一个StreamNode
包含着一个FlinkOperator
和这个Operator
运行所需的参数/配置信息; - 调用
StreamGraph.addEdge
,构建每个StreamNode
的Input StreamEdge
和Output StreamEdge
对象,分别添加到StreamNode
的成员变量inEdges
和outEdges
中; StreamEdge
中包含它上下游StreamNode
的Id
值,数据传递规则ShuffleMode
,StreamPartitioner
等信息;
- 根据
- 调用
StreamExecutionEnvironment.execute(StreamGraph)
执行任务提交流程并等待任务状态返回; - 在
StreamExecutionEnvironment.executeAsync
内通过调用DefaultExecutorServiceLoader.getExecutorFactory
检索jar
s 的META-INF.services
目录,加载适合的ExecutorFactory
(Java SPI
),当前Job
可用的是YarnJobClusterExecutorFactory
; - 通过
YarnJobClusterExecutorFactory
获取YarnJobClusterExecutor
,然后执行YarnJobClusterExecutor.execute
:- 调用
PipelineExecutorUtils.getJobGraph
将StreamGraph
转换为JobGraph
,转换的重要逻辑在StreamingJobGraphGenerator.createJobGraph
内,创建JobGraph
的主要操作有:创建一个包有 32 位随机数的JobID
;为Graph
的每个顶点生成一个全局唯一的hash
数(用户可通过DataStream.uid
设置);生成JobVertex
,它是Flink Task
的上层抽象,包含Operators
,invokableClass
,SlotSharing
,OperatorChaining
等信息,存放在JobGraph
的成员变量taskVertices
中;此外还有,ExecutionConfig
,SavepointConfig
,JarPath
s,Classpath
s 等信息; - 调用
YarnClusterClientFactory.getClusterSpecification
从Configuration
中解析当前提交Job
的JobManager/TaskManager
的内存信息,用于校验Yarn Cluster
是否有足够的资源分配给Job
启动; - 调用
YarnClusterDescriptor.deployJobCluster
执行具体的Job
提交流程,返回一个ClusterClientJobClientAdapter
对象,其内部通过RestClusterClient
对象与Yarn Cluster
通信,可获取Job
状态或是执行一些其它操作;
- 调用
4. 调用YarnClusterDescriptor.deployJobCluster
执行Job
提交:
-
调用
YarnClusterDescriptor.deployInternal
代码逻辑阻塞直到JobManager
提交Yarn
完成,逻辑如下:- 执行
Kerberos
认证(如果有需要); - 校验上下文环境&配置是否满足
Job
提交条件; - 查看是否存在合适的
yarn queue
; - 校验/适配
ClusterSpecification
(利用集群Yarn Container vCores/Memory
的配置); - 调用
YarnClusterDescriptor.startAppMaster
启动AppMaster
(下文详解); - 将调用
startAppMaster
返回的JobManager
相关信息和applicationId
写入Configuration
;
- 执行
-
调用
YarnClusterDescriptor.startAppMaster
:- 从
Configuration
获取FileSystem
配置信息,然后从plugins
目录加载jar
s 初始化FileSystem
实例; - 将
Zookeeper Namespace
写入Configuration
对象,可以通过high-availability.cluster-id
配置,默认是applicationId
; - 创建
YarnApplicationFileUploader
对象,然后将log4j.properties
,logback.xml
,flink-conf.yaml
,yarn-site.xml
,UserJars
,SystemJars
,PluginsJars
,JobGraph序列化
,kerberos配置/认证
等文件上传到hdfs:///user/$username/.flink/$applicationId/
目录下; - 调用
JobManagerProcessUtils::processSpecFromConfigWithNewOptionToInterpretLegacyHeap
,从Configuration
获取Memory
配置并计算出JobManager
所在进程的Memory
分配数值,最终以-Xmx, -Xms, -XX:MaxMetaspaceSize, -XX:MaxDirectMemorySize
形式用到JVM
进程启动;此外,Memory
计算对旧版配置FLINK_JM_HEAP, jobmanager.heap.size, jobmanager.heap.mb
做了兼容处理; - 调用
YarnClusterDescriptor.setupApplicationMasterContainer
创建ContainerLaunchContext
(启动Container
所需的信息集合); - 将
ApplicationName
,ContainerLaunchContext
,Resource
(向ResourceManager
申请资源),CLASSPATH
,Environment Variables
,ApplicationType
,yarn.application.node-label
,yarn.tags
等信息封装到ApplicationSubmissionContext
;此时,ApplicationSubmissionContext
就封装了ResourceManager
启动ApplicationMaster
所需的所有信息; - 为当前线程添加
提交任务失败
的回调函数,用于在提交任务失败后kill Application & delete Files that have been uploaded to HDFS
; - 调用
YarnClientImpl.submitApplication
将任务提交给Yarn Client Api
处理; - 轮训
YarnClientImpl.getApplicationReport
等待提交任务提交成功(AppMaster
正常启动),最后返回任务状态ApplicationReport
;
- 从
YarnClientImpl.submitApplication
内通过调用rmClient.submitApplication
向Yarn Client
提交Job
:
- 成员变量
YarnClientImpl.rmClient
通过调用ConfiguredRMFailoverProxyProvider.getProxy
获取到,YarnClientImpl.rmClient
实例是ApplicationClientProtocolPBClientImpl
的代理对象,其内部通过ProtoBuf + RpcEngine
提交任务到Yarn Server
端; - 在
Yarn Server
端ClientRMService.submitApplication
会收到所有来自Yarn Client
的Job
提交请求,执行后序的AppMaster
启动操作;
[Server] 启动 JobManager
// Server端在Container中启动JobManager的流程
[1] -> YarnJobClusterEntrypoint::main(String[] args)
-> -> EnvironmentInformation.logEnvironmentInfo
-> -> SignalHandler.register
-> -> JvmShutdownSafeguard.installAsShutdownHook
-> -> YarnEntrypointUtils.logYarnEnvironmentInformation
-> -> YarnEntrypointUtils.loadConfiguration
-> -> ClusterEntrypoint::runClusterEntrypoint
[2] -> -> -> YarnJobClusterEntrypoint.startCluster
-> -> -> -> PluginUtils.createPluginManagerFromRootFolder
-> -> -> -> ClusterEntrypoint.configureFileSystems
-> -> -> -> ClusterEntrypoint.installSecurityContext
-> -> -> -> ClusterEntrypoint.runCluster
-> -> -> -> -> ClusterEntrypoint.initializeServices
-> -> -> -> -> YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory
-> -> -> -> -> DefaultDispatcherResourceManagerComponentFactory.create
-> -> -> -> -> -> JobRestEndpointFactory.createRestEndpoint -> MiniDispatcherRestEndpoint.start
-> -> -> -> -> -> YarnResourceManagerFactory.createResourceManager -> YarnResourceManager.start
-> -> -> -> -> -> ZooKeeperUtils.createLeaderRetrievalService -> ZooKeeperLeaderRetrievalService.start
-> -> -> -> -> -> DefaultDispatcherRunnerFactory.createDispatcherRunner
-> -> -> -> -> -> -> JobDispatcherLeaderProcessFactoryFactory.createFactory
-> -> -> -> -> -> -> DefaultDispatcherRunner::create
-> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::createFor
-> -> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::new -> ZooKeeperLeaderElectionService.start
-> -> -> -> -> -> -> -> -> -> ZooKeeperLeaderElectionService.isLeader
-> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.grantLeadership
-> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess
-> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.start -> AbstractDispatcherLeaderProcess.startInternal
-> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.onStart
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherGatewayServiceFactory.create
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherFactory.createDispatcher (create AkkaServer)
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> MiniDispatcher.start (call AkkaServer to execute job start)
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.onStart
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherBootstrap.initialize -> AbstractDispatcherBootstrap.launchRecoveredJobGraphs
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runRecoveredJob
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runJob
// 调用链太长,从`Dispatcher.runJob`新建
[3] -> Dispatcher.runJob
-> -> Dispatcher.createJobManagerRunner
-> -> -> DefaultJobManagerRunnerFactory.createJobManagerRunner
-> -> -> -> JobManagerRunnerImpl::new
-> -> -> -> -> DefaultJobMasterServiceFactory.createJobMasterService
-> -> -> -> -> -> JobMaster::new
-> -> Dispatcher.startJobManagerRunner
[4] -> -> -> JobManagerRunnerImpl.start
-> -> -> -> ZooKeeperLeaderElectionService.start -> ZooKeeperLeaderElectionService.isLeader
-> -> -> -> -> JobManagerRunnerImpl.grantLeadership -> JobManagerRunnerImpl.verifyJobSchedulingStatusAndStartJobManager -> JobManagerRunnerImpl.startJobMaster
-> -> -> -> -> -> ZooKeeperRunningJobsRegistry.setJobRunning
-> -> -> -> -> -> JobMaster.start
-> -> -> -> -> -> -> JobMaster.startJobExecution
-> -> -> -> -> -> -> -> JobMaster.setNewFencingToken
-> -> -> -> -> -> -> -> JobMaster.startJobMasterServices
-> -> -> -> -> -> -> -> JobMaster.resetAndStartScheduler
-> -> -> -> -> -> -> -> -> DefaultJobManagerJobMetricGroupFactory.create
-> -> -> -> -> -> -> -> -> JobMaster.createScheduler -> DefaultSchedulerFactory.createInstance -> DefaultScheduler::new
-> -> -> -> -> -> -> -> -> JobMaster.startScheduling
-> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobStatusListener
-> -> -> -> -> -> -> -> -> -> SchedulerBase.startScheduling
-> -> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobMetrics
-> -> -> -> -> -> -> -> -> -> -> SchedulerBase.startAllOperatorCoordinators
-> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.startSchedulingInternal
-> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.prepareExecutionGraphForNgScheduling
-> -> -> -> -> -> -> -> -> -> -> -> EagerSchedulingStrategy.startScheduling -> EagerSchedulingStrategy.allocateSlotsAndDeploy // stream job use EagerSchedulingStrategy as default Scheduler
-> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlotsAndDeploy // explain ExecutionVertexID
[5] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlots -> DefaultExecutionSlotAllocator.allocateSlotsFor -> DefaultExecutionSlotAllocator.allocateSlot -> NormalSlotProviderStrategy.allocateSlot
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SchedulerImpl.allocateSlot -> SchedulerImpl.allocateSlotInternal -> SchedulerImpl.internalAllocateSlot -> SchedulerImpl.allocateSingleSlot -> SchedulerImpl.requestNewAllocatedSlot
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SlotPoolImpl.requestNewAllocatedBatchSlot -> SlotPoolImpl.requestNewAllocatedSlotInternal -> SlotPoolImpl.requestSlotFromResourceManager
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceManager.requestSlot -> SlotManagerImpl.registerSlotRequest -> SlotManagerImpl.internalRequestSlot -> SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot -> SlotManagerImpl.allocateResource
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceActionsImpl.allocateResource -> YarnResourceManager.startNewWorker -> YarnResourceManager.requestYarnContainer -> AMRMClientAsyncImpl.addContainerRequest
[6] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.waitForAllSlotsAndDeploy -> DefaultScheduler.deployAll -> DefaultScheduler.deployOrHandleError -> DefaultScheduler.deployTaskSafe
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultExecutionVertexOperations.deploy -> ExecutionVertex.deploy -> Execution.deploy
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskDeploymentDescriptorFactory.createDeploymentDescriptor
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> RpcTaskManagerGateway.submitTask
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskExecutor.submitTask [invoke TaskExecutor via RPC which created by calling JobMaster.registerTaskManager]
1. 在Per-job
模式下,AppMaster
进程的启动入口是YarnJobClusterEntrypoint.main
:
- 调用
EnvironmentInformation::logEnvironmentInfo
打印环境变量信息; - 调用
SignalHandler::register
注册TERM, HUP, INT
等终止/退出信号处理(例如:kill
); - 从
System::getEnv
获取变量PWD
,该目录为进程的启动目录,目录下包含进程启动所需的jar, config, job.graph, launch_container.sh
等文件(通过ln
创建的链接文件),PWD
目录位置:${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache/${applicationId}/${containerId}
; - 调用
YarnEntrypointUtils::logYarnEnvironmentInformation
打印Yarn
相关信息; - 调用
YarnEntrypointUtils.loadConfiguration
从flink-conf.yaml
和System.env
构建Configuration
对象; - 创建
YarnJobClusterEntrypoint
对象,并为当前进程添加shutDownHook
(在进程退出前进行删除本地文件的操作); - 调用
ClusterEntrypoint::runClusterEntrypoint
,函数内部通过调用YarnJobClusterEntrypoint.startCluster
做AppMaster
启动操作,然后为YarnJobClusterEntrypoint
对象注册了监控Termination
的回调函数,用于打印进程结束的exit code
等信息;
2. 调用YarnJobClusterEntrypoint.startCluster
,依次执行:
- 调用
PluginUtils::createPluginManagerFromRootFolder
,将plugin
的name, jars
填充到PluginDescriptor
,然后将PluginDescriptor
和委托给parent of plugin.classLoader
加载的包名列表
封装到DefaultPluginManager
; - 调用
ClusterEntrypoint.configureFileSystems
内部通过Java SPI
去加载并初始化所有jars
(包括common/plugin jars
)的META-INF/services/
目录下的FileSystemFactory
服务; - 调用
ClusterEntrypoint.installSecurityContext
,创建ZooKeeperModule, HadoopModule, JaasModule
和HadoopSecurityContext
对象并存放在SecurityUtils
的成员变量,然后通过创建的HadoopSecurityContext
对象触发执行ClusterEntrypoint.runCluster
;从代码阅读来看:installSecurityContext
的目的在于向运行环境添加必要的用户权限
和环境变量
配置; ClusterEntrypoint.runCluster
执行流程:- 调用
ClusterEntrypoint.initializeServices
初始化多个服务模块;commonRpcService
:AkkaRpc
服务,处理本地和远程的服务调用请求;haServices
:ZooKeeperHaServices
服务,处理与zookeeper
的交互请求,如JobManager/ResourceManager
等组件的leader
选举;blobServer
:处理Job BLOB文件
的/上传/下载/清除等操作,BLOB
包括Jar
s,RPC消息
等内容;heartbeatServices
:管理Job
各组件的心跳服务;metricRegistry
:监控指标注册,持有Job
各监控指标和MetricReporter
s 信息;processMetricGroup
:系统/进程监控指标组,包含CPU
,Memory
,GC
,ClassLoader
,Network
,Swap
等监控指标;archivedExecutionGraphStore
:用于保存序列化的ExecutionGraph
?
- 调用
YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory
创建工厂对象DefaultDispatcherResourceManagerComponentFactory
,工厂对象用于创建DispatcherResourceManagerComponent
对象,该对象的创建过程是Job
启动的核心逻辑所在;另外提一下,FileJobGraphRetriever
将本地文件job.graph
反序列化为JobGraph
; DefaultDispatcherResourceManagerComponentFactory.create
执行流程:- 创建并启动
Flink-Web-Dashboard
的Rest
接口服务WebMonitorEndpoint
; - 创建并启动
YarnResourceManager
以及其内部服务SlotManager
,负责Job
的资源管理,如:申请/释放/记录; - 创建并启动
MiniDispatcher
,在Dispatcher
启动过程中会创建JobManager
线程完成任务的启动; - 向 ZooKeeper 注册
ResourceManager
,Dispatcher
的监听协调服务,用于服务的容错恢复,通过在Zookeeper
中保持了一个锁文件来协调服务; - 上述
WebMonitorEndpoint, YarnResourceManager, MiniDispatcher
三个服务在ZooKeeper
下保存着各自的相关信息,通过ZooKeeperHaServices
保证服务的高可用;三个服务的启动过程是通过AkkaRPC + 状态机
的设计模式实现的,在各服务对象创建过程中注册生成AkkaServer
,在服务启动过程中通过AkkaRPC
调用不同状态机函数
,最后回调onStart
执行实际的启动逻辑,完整的启动逻辑较为复杂,需耐心翻阅;状态机默认状态是:StoppedState.STOPPED
,AkkaServer
创建逻辑在AkkaRpcService.startServer
内,另外,在AkkaRpcService.startServer
内通过调用AkkaRpcService.registerAkkaRpcActor
创建AkkaRpcActor
,AkkaRpcActor
为具体接收消息并执行状态机
逻辑的入口,接收消息的处理逻辑在AbstractActor.createReceive
内;
- 创建并启动
- 调用
3. 调用Dispatcher.runJob
进入JobManager
启动流程:
- 调用
Dispatcher.createJobManagerRunner
创建JobManagerRunnerImpl
,JobMaster
对象,JobMaster
代表了一个运行的JobGraph
,在JobMaster::new
过程中创建了RpcServer
,DefaultScheduler
,SchedulerImpl
,SlotPoolImpl
,ExecutionGraph
,CheckpointCoordinator
等重要对象,创建ExecutionGraph
入口:SchedulerBase.createExecutionGraph
; - 调用
Dispatcher.startJobManagerRunner
执行整个任务的提交流程:启动JobManager
端各服务模块,申请Slot
资源启动TaskManager
,提交/执行Task
等流程;
4. 从调用JobManagerRunnerImpl.start
开始到DefaultScheduler.allocateSlotsAndDeploy
结束
- 在这段代码调用区间内主要做了一些服务的启动,
Job
状态的变更和监听服务的添加,为JobManager
添加监控指标;代码较简单,在此不一一说明;
5. 调用DefaultScheduler.allocateSlots
进入资源申请流程:
// TODO
find slot from existing resource, if don‘t find request resourceManager for a new resource
AMRMClientAsyncImpl.CallbackHandlerThread // get Resource that have been allocated and call YarnResourceManager.onContainersAllocated
to process TaskManager
AMRMClientAsyncImpl.HeartbeatThread // invoke AMRMClientImpl.allocate
periodically, ResourceManager will be called to allocate Resource
YarnResourceManager.requestYarnContainer // put request of allocate Container to member variable AMRMClientImpl.ask
6. 调用DefaultScheduler.waitForAllSlotsAndDeploy
进入Task
发布流程
// TODO
获取到的 container 资源存放在纳贡,以供给后序 slot 分配; start TaskManager and register it to JobManager
[Server] 启动 TaskManager
// TODO
- YarnResourceManager.onContainersAllocated
- org.apache.flink.yarn.YarnTaskExecutorRunner
内存计算
- start command for TaskManager -> TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec
- resource request for TaskManager -> WorkerSpecContainerResourceAdapter.createAndMapContainerResource