前言
当前团队Flink集群使用的版本是1.7.2,采用per-job on yarn的运行模式,在近一年多的使用过程中碰到过多次内存相关的问题,比如:beyond the 'PHYSICAL' memory limit... Killing container.,总是感觉Flink Streaming在实际场景中的内存管理不够完美,会遇到各样的问题。在Flink 1.10版本 release 后,了解到该版本对TaskExecutor的内存配置做了重新设计,内心有想要去了解的冲动,然而看过社区文档后又有了更多的疑问,比如:TaskExecutor对应的 JVM 进程在启动时只会有-Xmx -Xms -XX:MaxDirectMemorySize三个内存相关参数是通过Flink计算得出的,新增的细粒度配置能给JVM这三个启动参数带来多少变化,或是只是一个方便内存计算的工具,对于对Flink内存较为了解的人来讲,通过旧的内存配置参数可以完成与新配置一样的效果。
起初这篇文章计划写Flink Streaming新/旧内存管理对比相关的内容,然而之后一段时间的业余精力被学习Rust消耗掉啦,6 月底才算有时间开篇;之前在阅读内存管理代码同时参杂读了些任务启动相关代码,所以就扩展下之前计划写的文章范围:以描述Flink Streaming整个启动流程为主,辅以内存分配/管理相关代码分析,阅读的Flink代码版本定为最新版本1.11.0。
启动流程分析
启动流程: 从脚本提交到任务启动成功(Flink-Web-Dashboard展示的任务状态为running)的整个流程,这个流程大致分为 3 个Stage:
Client端:封装信息提交给Yarn,注册回调函数,轮训Job状态;Yarn分配Container启动AppMaster;AppMaster向Yarn申请资源启动TaskManager;
脚本示例:
// Per-job model
flink run -m yarn-cluster -yn 24 -ys 2 -ytm 6g -ynm $job_name -c $main_class -d -yq ./$job_jar $params
StreamGraph:
graph LR
A(KafkaSource) --> B(MapOperator)
B --> C(KafkaSink)
[Client] 初始化 Job
// Client源码调用流程
[1] -> CliFrontend::main(String[] args)
-> -> CliFrontend.parseParameters
-> -> -> CliFrontend.run
[2] -> UserMainClass::main(args: Array[String])
-> -> StreamExecutionEnvironment.getExecutionEnvironment
-> -> StreamExecutionEnvironment.addSource
-> -> DataStream.map
-> -> DataStream.addSink
[3] -> -> StreamExecutionEnvironment.execute
-> -> -> StreamExecutionEnvironment.getStreamGraph
-> -> -> -> StreamExecutionEnvironment.getStreamGraphGenerator
-> -> -> -> StreamGraphGenerator.generate
-> -> -> StreamExecutionEnvironment.execute(StreamGraph)
-> -> -> -> StreamExecutionEnvironment.executeAsync
-> -> -> -> -> DefaultExecutorServiceLoader.getExecutorFactory
-> -> -> -> -> YarnJobClusterExecutorFactory.getExecutor
-> -> -> -> -> YarnJobClusterExecutor.execute
-> -> -> -> -> -> PipelineExecutorUtils::getJobGraph
-> -> -> -> -> -> YarnClusterClientFactory.createClusterDescriptor
[4] -> -> -> -> -> -> YarnClusterDescriptor.deployJobCluster
-> -> -> -> -> -> -> YarnClusterDescriptor.deployInternal
-> -> -> -> -> -> -> -> YarnClusterDescriptor.startAppMaster
-> -> -> -> -> -> -> -> -> YarnClientImpl.submitApplication
-> -> -> -> -> -> return CompletableFuture(new ClusterClientJobClientAdapter)
-> -> -> -> -> [action] get `JobClient`, invoke `JobListener`, return `JobClient`
-> -> -> -> [action] create `JobExecutionResult`, invoke `JobListener`, return `JobExecutionResult`
1. 任务提交脚本会触发调用org.apache.flink.client.cli.CliFrontend::main(String[] args),然后依次会执行:
- 调用
EnvironmentInformation::logEnvironmentInfo加载 JVM 上线文环境; - 调用
GlobalConfiguration::loadConfiguration加载flink-conf.yaml配置信息; - 调用
CliFrontend.loadCustomCommandLines返回由GenericCLI,FlinkYarnSessionCli,DefaultCLI组成的List<CustomCommandLine>对象;FlinkYarnSessionCli中包含与yarn-cluster提交模式相关,可以通过Command命令提交的参数列表,如:-yid, -ynm, -yqu等等; - 调用
CliFrontend.new创建CliFrontend对象,利用[3]中的List<CustomCommandLine>获取有效可用于解析Command参数的Options列表,附值给成员变量customCommandLineOptions; - 调用
CliFrontend.parseParameters,匹配Command第一个参数run然后调用后序函数; -
调用
CliFrontend.run:- 解析
Command参数并封装为CommandLine对象; - 创建
ProgramOptions对象,是Job Command参数的封装类,持有CommandLine解析得来的参数; - 将
ProgramOptions传入PackagedProgram的构造函数,创建PackagedProgram对象,PackagedProgram负责具体具体调用UserJar的Main函数; - 执行
CliFrontend.executeProgram为任务执行上下文创建ExecutionEnvironmentFactory和StreamExecutionEnvironmentFactory对象,然后调用PackagedProgram.invokeInteractiveModeForExecution方法反射调用UserJar的main函数,执行具体任务逻辑;
- 解析
2. UserJar的任务入口函数UserMainClass::main(args: Array[String])被调用后,会依次执行:
- 调用
StreamExecutionEnvironment::getExecutionEnvironment创建StreamExecutionEnvironment和StreamContextEnvironment对象; - 调用
StreamExecutionEnvironment.addSource创建DataStreamSource对象,DataStreamSource对象内持有上下文环境中的StreamExecutionEnvironment和StreamTransformation对象,StreamTransformation中持有FlinkKafkaConsumer对象; - 调用
DataStream.map创建OneInputTransformation对象,其内部持有上游的StreamTransformation和上线文中的StreamExecutionEnvironment对象;最后将OneInputTransformation添加到StreamExecutionEnvironment的成员变量transformations列表中; - 调用
DataStream.addSink创建DataStreamSink对象,并将其添加到StreamExecutionEnvironment的成员变量transformations列表中; - 最后调用
StreamExecutionEnvironment.execute开始执行Job创建和任务提交;
3. StreamExecutionEnvironment.execute在Client端的代码执行流程:
-
调用
StreamExecutionEnvironment.getStreamGraph,先创建StreamGraphGenerator对象,然后调用StreamGraphGenerator.generate生成StreamGraph,生成StreamGraph流程如下:- 根据
Job和StreamExecutionEnvironment的配置以及上下文信息创建StreamGraph对象; - 遍历
StreamExecutionEnvironment.transformations,对每个StreamTransformation进行解析; - 从
StreamTransformation构建出StreamNode并存放到StreamGraph对象的成员变量Map<Integer, StreamNode> streamNodes中,一个StreamNode包含着一个FlinkOperator和这个Operator运行所需的参数/配置信息; - 调用
StreamGraph.addEdge,构建每个StreamNode的Input StreamEdge和Output StreamEdge对象,分别添加到StreamNode的成员变量inEdges和outEdges中; StreamEdge中包含它上下游StreamNode的Id值,数据传递规则ShuffleMode,StreamPartitioner等信息;
- 根据
- 调用
StreamExecutionEnvironment.execute(StreamGraph)执行任务提交流程并等待任务状态返回; - 在
StreamExecutionEnvironment.executeAsync内通过调用DefaultExecutorServiceLoader.getExecutorFactory检索jars 的META-INF.services目录,加载适合的ExecutorFactory(Java SPI),当前Job可用的是YarnJobClusterExecutorFactory; - 通过
YarnJobClusterExecutorFactory获取YarnJobClusterExecutor,然后执行YarnJobClusterExecutor.execute:- 调用
PipelineExecutorUtils.getJobGraph将StreamGraph转换为JobGraph,转换的重要逻辑在StreamingJobGraphGenerator.createJobGraph内,创建JobGraph的主要操作有:创建一个包有 32 位随机数的JobID;为Graph的每个顶点生成一个全局唯一的hash数(用户可通过DataStream.uid设置);生成JobVertex,它是Flink Task的上层抽象,包含Operators,invokableClass,SlotSharing,OperatorChaining等信息,存放在JobGraph的成员变量taskVertices中;此外还有,ExecutionConfig,SavepointConfig,JarPaths,Classpaths 等信息; - 调用
YarnClusterClientFactory.getClusterSpecification从Configuration中解析当前提交Job的JobManager/TaskManager的内存信息,用于校验Yarn Cluster是否有足够的资源分配给Job启动; - 调用
YarnClusterDescriptor.deployJobCluster执行具体的Job提交流程,返回一个ClusterClientJobClientAdapter对象,其内部通过RestClusterClient对象与Yarn Cluster通信,可获取Job状态或是执行一些其它操作;
- 调用
4. 调用YarnClusterDescriptor.deployJobCluster执行Job提交:
-
调用
YarnClusterDescriptor.deployInternal代码逻辑阻塞直到JobManager提交Yarn完成,逻辑如下:- 执行
Kerberos认证(如果有需要); - 校验上下文环境&配置是否满足
Job提交条件; - 查看是否存在合适的
yarn queue; - 校验/适配
ClusterSpecification(利用集群Yarn Container vCores/Memory的配置); - 调用
YarnClusterDescriptor.startAppMaster启动AppMaster(下文详解); - 将调用
startAppMaster返回的JobManager相关信息和applicationId写入Configuration;
- 执行
-
调用
YarnClusterDescriptor.startAppMaster:- 从
Configuration获取FileSystem配置信息,然后从plugins目录加载jars 初始化FileSystem实例; - 将
Zookeeper Namespace写入Configuration对象,可以通过high-availability.cluster-id配置,默认是applicationId; - 创建
YarnApplicationFileUploader对象,然后将log4j.properties,logback.xml,flink-conf.yaml,yarn-site.xml,UserJars,SystemJars,PluginsJars,JobGraph序列化,kerberos配置/认证等文件上传到hdfs:///user/$username/.flink/$applicationId/目录下; - 调用
JobManagerProcessUtils::processSpecFromConfigWithNewOptionToInterpretLegacyHeap,从Configuration获取Memory配置并计算出JobManager所在进程的Memory分配数值,最终以-Xmx, -Xms, -XX:MaxMetaspaceSize, -XX:MaxDirectMemorySize形式用到JVM进程启动;此外,Memory计算对旧版配置FLINK_JM_HEAP, jobmanager.heap.size, jobmanager.heap.mb做了兼容处理; - 调用
YarnClusterDescriptor.setupApplicationMasterContainer创建ContainerLaunchContext(启动Container所需的信息集合); - 将
ApplicationName,ContainerLaunchContext,Resource(向ResourceManager申请资源),CLASSPATH,Environment Variables,ApplicationType,yarn.application.node-label,yarn.tags等信息封装到ApplicationSubmissionContext;此时,ApplicationSubmissionContext就封装了ResourceManager启动ApplicationMaster所需的所有信息; - 为当前线程添加
提交任务失败的回调函数,用于在提交任务失败后kill Application & delete Files that have been uploaded to HDFS; - 调用
YarnClientImpl.submitApplication将任务提交给Yarn Client Api处理; - 轮训
YarnClientImpl.getApplicationReport等待提交任务提交成功(AppMaster正常启动),最后返回任务状态ApplicationReport;
- 从
YarnClientImpl.submitApplication内通过调用rmClient.submitApplication向Yarn Client提交Job:
- 成员变量
YarnClientImpl.rmClient通过调用ConfiguredRMFailoverProxyProvider.getProxy获取到,YarnClientImpl.rmClient实例是ApplicationClientProtocolPBClientImpl的代理对象,其内部通过ProtoBuf + RpcEngine提交任务到Yarn Server端; - 在
Yarn Server端ClientRMService.submitApplication会收到所有来自Yarn Client的Job提交请求,执行后序的AppMaster启动操作;
[Server] 启动 JobManager
// Server端在Container中启动JobManager的流程
[1] -> YarnJobClusterEntrypoint::main(String[] args)
-> -> EnvironmentInformation.logEnvironmentInfo
-> -> SignalHandler.register
-> -> JvmShutdownSafeguard.installAsShutdownHook
-> -> YarnEntrypointUtils.logYarnEnvironmentInformation
-> -> YarnEntrypointUtils.loadConfiguration
-> -> ClusterEntrypoint::runClusterEntrypoint
[2] -> -> -> YarnJobClusterEntrypoint.startCluster
-> -> -> -> PluginUtils.createPluginManagerFromRootFolder
-> -> -> -> ClusterEntrypoint.configureFileSystems
-> -> -> -> ClusterEntrypoint.installSecurityContext
-> -> -> -> ClusterEntrypoint.runCluster
-> -> -> -> -> ClusterEntrypoint.initializeServices
-> -> -> -> -> YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory
-> -> -> -> -> DefaultDispatcherResourceManagerComponentFactory.create
-> -> -> -> -> -> JobRestEndpointFactory.createRestEndpoint -> MiniDispatcherRestEndpoint.start
-> -> -> -> -> -> YarnResourceManagerFactory.createResourceManager -> YarnResourceManager.start
-> -> -> -> -> -> ZooKeeperUtils.createLeaderRetrievalService -> ZooKeeperLeaderRetrievalService.start
-> -> -> -> -> -> DefaultDispatcherRunnerFactory.createDispatcherRunner
-> -> -> -> -> -> -> JobDispatcherLeaderProcessFactoryFactory.createFactory
-> -> -> -> -> -> -> DefaultDispatcherRunner::create
-> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::createFor
-> -> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::new -> ZooKeeperLeaderElectionService.start
-> -> -> -> -> -> -> -> -> -> ZooKeeperLeaderElectionService.isLeader
-> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.grantLeadership
-> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess
-> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.start -> AbstractDispatcherLeaderProcess.startInternal
-> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.onStart
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherGatewayServiceFactory.create
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherFactory.createDispatcher (create AkkaServer)
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> MiniDispatcher.start (call AkkaServer to execute job start)
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.onStart
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherBootstrap.initialize -> AbstractDispatcherBootstrap.launchRecoveredJobGraphs
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runRecoveredJob
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runJob
// 调用链太长,从`Dispatcher.runJob`新建
[3] -> Dispatcher.runJob
-> -> Dispatcher.createJobManagerRunner
-> -> -> DefaultJobManagerRunnerFactory.createJobManagerRunner
-> -> -> -> JobManagerRunnerImpl::new
-> -> -> -> -> DefaultJobMasterServiceFactory.createJobMasterService
-> -> -> -> -> -> JobMaster::new
-> -> Dispatcher.startJobManagerRunner
[4] -> -> -> JobManagerRunnerImpl.start
-> -> -> -> ZooKeeperLeaderElectionService.start -> ZooKeeperLeaderElectionService.isLeader
-> -> -> -> -> JobManagerRunnerImpl.grantLeadership -> JobManagerRunnerImpl.verifyJobSchedulingStatusAndStartJobManager -> JobManagerRunnerImpl.startJobMaster
-> -> -> -> -> -> ZooKeeperRunningJobsRegistry.setJobRunning
-> -> -> -> -> -> JobMaster.start
-> -> -> -> -> -> -> JobMaster.startJobExecution
-> -> -> -> -> -> -> -> JobMaster.setNewFencingToken
-> -> -> -> -> -> -> -> JobMaster.startJobMasterServices
-> -> -> -> -> -> -> -> JobMaster.resetAndStartScheduler
-> -> -> -> -> -> -> -> -> DefaultJobManagerJobMetricGroupFactory.create
-> -> -> -> -> -> -> -> -> JobMaster.createScheduler -> DefaultSchedulerFactory.createInstance -> DefaultScheduler::new
-> -> -> -> -> -> -> -> -> JobMaster.startScheduling
-> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobStatusListener
-> -> -> -> -> -> -> -> -> -> SchedulerBase.startScheduling
-> -> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobMetrics
-> -> -> -> -> -> -> -> -> -> -> SchedulerBase.startAllOperatorCoordinators
-> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.startSchedulingInternal
-> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.prepareExecutionGraphForNgScheduling
-> -> -> -> -> -> -> -> -> -> -> -> EagerSchedulingStrategy.startScheduling -> EagerSchedulingStrategy.allocateSlotsAndDeploy // stream job use EagerSchedulingStrategy as default Scheduler
-> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlotsAndDeploy // explain ExecutionVertexID
[5] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlots -> DefaultExecutionSlotAllocator.allocateSlotsFor -> DefaultExecutionSlotAllocator.allocateSlot -> NormalSlotProviderStrategy.allocateSlot
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SchedulerImpl.allocateSlot -> SchedulerImpl.allocateSlotInternal -> SchedulerImpl.internalAllocateSlot -> SchedulerImpl.allocateSingleSlot -> SchedulerImpl.requestNewAllocatedSlot
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SlotPoolImpl.requestNewAllocatedBatchSlot -> SlotPoolImpl.requestNewAllocatedSlotInternal -> SlotPoolImpl.requestSlotFromResourceManager
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceManager.requestSlot -> SlotManagerImpl.registerSlotRequest -> SlotManagerImpl.internalRequestSlot -> SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot -> SlotManagerImpl.allocateResource
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceActionsImpl.allocateResource -> YarnResourceManager.startNewWorker -> YarnResourceManager.requestYarnContainer -> AMRMClientAsyncImpl.addContainerRequest
[6] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.waitForAllSlotsAndDeploy -> DefaultScheduler.deployAll -> DefaultScheduler.deployOrHandleError -> DefaultScheduler.deployTaskSafe
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultExecutionVertexOperations.deploy -> ExecutionVertex.deploy -> Execution.deploy
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskDeploymentDescriptorFactory.createDeploymentDescriptor
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> RpcTaskManagerGateway.submitTask
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskExecutor.submitTask [invoke TaskExecutor via RPC which created by calling JobMaster.registerTaskManager]
1. 在Per-job模式下,AppMaster进程的启动入口是YarnJobClusterEntrypoint.main:
- 调用
EnvironmentInformation::logEnvironmentInfo打印环境变量信息; - 调用
SignalHandler::register注册TERM, HUP, INT等终止/退出信号处理(例如:kill); - 从
System::getEnv获取变量PWD,该目录为进程的启动目录,目录下包含进程启动所需的jar, config, job.graph, launch_container.sh等文件(通过ln创建的链接文件),PWD目录位置:${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache/${applicationId}/${containerId}; - 调用
YarnEntrypointUtils::logYarnEnvironmentInformation打印Yarn相关信息; - 调用
YarnEntrypointUtils.loadConfiguration从flink-conf.yaml和System.env构建Configuration对象; - 创建
YarnJobClusterEntrypoint对象,并为当前进程添加shutDownHook(在进程退出前进行删除本地文件的操作); - 调用
ClusterEntrypoint::runClusterEntrypoint,函数内部通过调用YarnJobClusterEntrypoint.startCluster做AppMaster启动操作,然后为YarnJobClusterEntrypoint对象注册了监控Termination的回调函数,用于打印进程结束的exit code等信息;
2. 调用YarnJobClusterEntrypoint.startCluster,依次执行:
- 调用
PluginUtils::createPluginManagerFromRootFolder,将plugin的name, jars填充到PluginDescriptor,然后将PluginDescriptor和委托给parent of plugin.classLoader加载的包名列表封装到DefaultPluginManager; - 调用
ClusterEntrypoint.configureFileSystems内部通过Java SPI去加载并初始化所有jars(包括common/plugin jars)的META-INF/services/目录下的FileSystemFactory服务; - 调用
ClusterEntrypoint.installSecurityContext,创建ZooKeeperModule, HadoopModule, JaasModule和HadoopSecurityContext对象并存放在SecurityUtils的成员变量,然后通过创建的HadoopSecurityContext对象触发执行ClusterEntrypoint.runCluster;从代码阅读来看:installSecurityContext的目的在于向运行环境添加必要的用户权限和环境变量配置; ClusterEntrypoint.runCluster执行流程:- 调用
ClusterEntrypoint.initializeServices初始化多个服务模块;commonRpcService:AkkaRpc服务,处理本地和远程的服务调用请求;haServices:ZooKeeperHaServices服务,处理与zookeeper的交互请求,如JobManager/ResourceManager等组件的leader选举;blobServer:处理Job BLOB文件的/上传/下载/清除等操作,BLOB包括Jars,RPC消息等内容;heartbeatServices:管理Job各组件的心跳服务;metricRegistry:监控指标注册,持有Job各监控指标和MetricReporters 信息;processMetricGroup:系统/进程监控指标组,包含CPU,Memory,GC,ClassLoader,Network,Swap等监控指标;archivedExecutionGraphStore:用于保存序列化的ExecutionGraph?
- 调用
YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory创建工厂对象DefaultDispatcherResourceManagerComponentFactory,工厂对象用于创建DispatcherResourceManagerComponent对象,该对象的创建过程是Job启动的核心逻辑所在;另外提一下,FileJobGraphRetriever将本地文件job.graph反序列化为JobGraph; DefaultDispatcherResourceManagerComponentFactory.create执行流程:- 创建并启动
Flink-Web-Dashboard的Rest接口服务WebMonitorEndpoint; - 创建并启动
YarnResourceManager以及其内部服务SlotManager,负责Job的资源管理,如:申请/释放/记录; - 创建并启动
MiniDispatcher,在Dispatcher启动过程中会创建JobManager线程完成任务的启动; - 向 ZooKeeper 注册
ResourceManager,Dispatcher的监听协调服务,用于服务的容错恢复,通过在Zookeeper中保持了一个锁文件来协调服务; - 上述
WebMonitorEndpoint, YarnResourceManager, MiniDispatcher三个服务在ZooKeeper下保存着各自的相关信息,通过ZooKeeperHaServices保证服务的高可用;三个服务的启动过程是通过AkkaRPC + 状态机的设计模式实现的,在各服务对象创建过程中注册生成AkkaServer,在服务启动过程中通过AkkaRPC调用不同状态机函数,最后回调onStart执行实际的启动逻辑,完整的启动逻辑较为复杂,需耐心翻阅;状态机默认状态是:StoppedState.STOPPED,AkkaServer创建逻辑在AkkaRpcService.startServer内,另外,在AkkaRpcService.startServer内通过调用AkkaRpcService.registerAkkaRpcActor创建AkkaRpcActor,AkkaRpcActor为具体接收消息并执行状态机逻辑的入口,接收消息的处理逻辑在AbstractActor.createReceive内;
- 创建并启动
- 调用
3. 调用Dispatcher.runJob进入JobManager启动流程:
- 调用
Dispatcher.createJobManagerRunner创建JobManagerRunnerImpl,JobMaster对象,JobMaster代表了一个运行的JobGraph,在JobMaster::new过程中创建了RpcServer,DefaultScheduler,SchedulerImpl,SlotPoolImpl,ExecutionGraph,CheckpointCoordinator等重要对象,创建ExecutionGraph入口:SchedulerBase.createExecutionGraph; - 调用
Dispatcher.startJobManagerRunner执行整个任务的提交流程:启动JobManager端各服务模块,申请Slot资源启动TaskManager,提交/执行Task等流程;
4. 从调用JobManagerRunnerImpl.start开始到DefaultScheduler.allocateSlotsAndDeploy结束
- 在这段代码调用区间内主要做了一些服务的启动,
Job状态的变更和监听服务的添加,为JobManager添加监控指标;代码较简单,在此不一一说明;
5. 调用DefaultScheduler.allocateSlots进入资源申请流程:
// TODO
find slot from existing resource, if don‘t find request resourceManager for a new resource
AMRMClientAsyncImpl.CallbackHandlerThread // get Resource that have been allocated and call YarnResourceManager.onContainersAllocated to process TaskManager
AMRMClientAsyncImpl.HeartbeatThread // invoke AMRMClientImpl.allocate periodically, ResourceManager will be called to allocate Resource
YarnResourceManager.requestYarnContainer // put request of allocate Container to member variable AMRMClientImpl.ask
6. 调用DefaultScheduler.waitForAllSlotsAndDeploy进入Task发布流程
// TODO
获取到的 container 资源存放在纳贡,以供给后序 slot 分配; start TaskManager and register it to JobManager
[Server] 启动 TaskManager
// TODO
- YarnResourceManager.onContainersAllocated
- org.apache.flink.yarn.YarnTaskExecutorRunner
内存计算
- start command for TaskManager -> TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec
- resource request for TaskManager -> WorkerSpecContainerResourceAdapter.createAndMapContainerResource