TINY Talk is cheap.

Flink Streaming 的启动流程源码分析

2020-06-26
tiny

前言

当前团队Flink集群使用的版本是1.7.2,采用per-job on yarn的运行模式,在近一年多的使用过程中碰到过多次内存相关的问题,比如:beyond the 'PHYSICAL' memory limit... Killing container.,总是感觉Flink Streaming在实际场景中的内存管理不够完美,会遇到各样的问题。在Flink 1.10版本 release 后,了解到该版本对TaskExecutor的内存配置做了重新设计,内心有想要去了解的冲动,然而看过社区文档后又有了更多的疑问,比如:TaskExecutor对应的 JVM 进程在启动时只会有-Xmx -Xms -XX:MaxDirectMemorySize三个内存相关参数是通过Flink计算得出的,新增的细粒度配置能给JVM这三个启动参数带来多少变化,或是只是一个方便内存计算的工具,对于对Flink内存较为了解的人来讲,通过旧的内存配置参数可以完成与新配置一样的效果。

起初这篇文章计划写Flink Streaming新/旧内存管理对比相关的内容,然而之后一段时间的业余精力被学习Rust消耗掉啦,6 月底才算有时间开篇;之前在阅读内存管理代码同时参杂读了些任务启动相关代码,所以就扩展下之前计划写的文章范围:以描述Flink Streaming整个启动流程为主,辅以内存分配/管理相关代码分析,阅读的Flink代码版本定为最新版本1.11.0

启动流程分析

启动流程: 从脚本提交到任务启动成功(Flink-Web-Dashboard展示的任务状态为running)的整个流程,这个流程大致分为 3 个Stage

  1. Client端:封装信息提交给Yarn,注册回调函数,轮训 Job 状态;
  2. Yarn分配Container启动AppMaster
  3. AppMasterYarn申请资源启动TaskManager

脚本示例:

// Per-job model
flink run -m yarn-cluster -yn 24 -ys 2 -ytm 6g -ynm $job_name -c $main_class -d -yq ./$job_jar $params

StreamGraph:

graph LR
A(KafkaSource) --> B(MapOperator)
B --> C(KafkaSink)

[Client] 初始化 Job


// Client源码调用流程
[1] -> CliFrontend::main(String[] args)
    -> -> CliFrontend.parseParameters
    -> -> -> CliFrontend.run
[2] -> UserMainClass::main(args: Array[String])
    -> -> StreamExecutionEnvironment.getExecutionEnvironment
    -> -> StreamExecutionEnvironment.addSource
    -> -> DataStream.map
    -> -> DataStream.addSink
[3] -> -> StreamExecutionEnvironment.execute
    -> -> -> StreamExecutionEnvironment.getStreamGraph
    -> -> -> -> StreamExecutionEnvironment.getStreamGraphGenerator
    -> -> -> -> StreamGraphGenerator.generate
    -> -> -> StreamExecutionEnvironment.execute(StreamGraph)
    -> -> -> -> StreamExecutionEnvironment.executeAsync
    -> -> -> -> -> DefaultExecutorServiceLoader.getExecutorFactory
    -> -> -> -> -> YarnJobClusterExecutorFactory.getExecutor
    -> -> -> -> -> YarnJobClusterExecutor.execute
    -> -> -> -> -> -> PipelineExecutorUtils::getJobGraph
    -> -> -> -> -> -> YarnClusterClientFactory.createClusterDescriptor
[4] -> -> -> -> -> -> YarnClusterDescriptor.deployJobCluster
    -> -> -> -> -> -> -> YarnClusterDescriptor.deployInternal
    -> -> -> -> -> -> -> -> YarnClusterDescriptor.startAppMaster
    -> -> -> -> -> -> -> -> -> YarnClientImpl.submitApplication
    -> -> -> -> -> -> return CompletableFuture(new ClusterClientJobClientAdapter)
    -> -> -> -> -> [action] get `JobClient`, invoke `JobListener`, return `JobClient`
    -> -> -> -> [action] create `JobExecutionResult`, invoke `JobListener`, return `JobExecutionResult`

1. 任务提交脚本会触发调用org.apache.flink.client.cli.CliFrontend::main(String[] args),然后依次会执行:

  • 调用EnvironmentInformation::logEnvironmentInfo加载 JVM 上线文环境;
  • 调用GlobalConfiguration::loadConfiguration加载flink-conf.yaml配置信息;
  • 调用CliFrontend.loadCustomCommandLines返回由GenericCLI,FlinkYarnSessionCli,DefaultCLI组成的List<CustomCommandLine>对象;FlinkYarnSessionCli中包含与yarn-cluster提交模式相关,可以通过Command命令提交的参数列表,如:-yid, -ynm, -yqu等等;
  • 调用CliFrontend.new创建CliFrontend对象,利用[3]中的List<CustomCommandLine>获取有效可用于解析Command参数的Options列表,附值给成员变量customCommandLineOptions
  • 调用CliFrontend.parseParameters,匹配Command第一个参数run然后调用后序函数;
  • 调用CliFrontend.run

    • 解析Command参数并封装为CommandLine对象;
    • 创建ProgramOptions对象,是Job Command参数的封装类,持有CommandLine解析得来的参数;
    • ProgramOptions传入PackagedProgram的构造函数,创建PackagedProgram对象,PackagedProgram负责具体具体调用UserJarMain函数;
    • 执行CliFrontend.executeProgram为任务执行上下文创建ExecutionEnvironmentFactoryStreamExecutionEnvironmentFactory对象,然后调用PackagedProgram.invokeInteractiveModeForExecution方法反射调用UserJarmain函数,执行具体任务逻辑;

2. UserJar的任务入口函数UserMainClass::main(args: Array[String])被调用后,会依次执行:

  • 调用StreamExecutionEnvironment::getExecutionEnvironment创建StreamExecutionEnvironmentStreamContextEnvironment对象;
  • 调用StreamExecutionEnvironment.addSource创建DataStreamSource对象,DataStreamSource对象内持有上下文环境中的StreamExecutionEnvironmentStreamTransformation对象,StreamTransformation中持有FlinkKafkaConsumer对象;
  • 调用DataStream.map创建OneInputTransformation对象,其内部持有上游的StreamTransformation和上线文中的StreamExecutionEnvironment对象;最后将OneInputTransformation添加到StreamExecutionEnvironment的成员变量transformations列表中;
  • 调用DataStream.addSink创建DataStreamSink对象,并将其添加到StreamExecutionEnvironment的成员变量transformations列表中;
  • 最后调用StreamExecutionEnvironment.execute开始执行Job创建和任务提交;

3. StreamExecutionEnvironment.executeClient端的代码执行流程:

  • 调用StreamExecutionEnvironment.getStreamGraph,先创建StreamGraphGenerator对象,然后调用StreamGraphGenerator.generate生成StreamGraph,生成StreamGraph流程如下:

    • 根据JobStreamExecutionEnvironment的配置以及上下文信息创建StreamGraph对象;
    • 遍历StreamExecutionEnvironment.transformations,对每个StreamTransformation进行解析;
    • StreamTransformation构建出StreamNode并存放到StreamGraph对象的成员变量Map<Integer, StreamNode> streamNodes中,一个StreamNode包含着一个FlinkOperator和这个Operator运行所需的参数/配置信息;
    • 调用StreamGraph.addEdge,构建每个StreamNodeInput StreamEdgeOutput StreamEdge对象,分别添加到StreamNode的成员变量inEdgesoutEdges中;
    • StreamEdge中包含它上下游StreamNodeId值,数据传递规则ShuffleMode, StreamPartitioner等信息;
  • 调用StreamExecutionEnvironment.execute(StreamGraph)执行任务提交流程并等待任务状态返回;
  • StreamExecutionEnvironment.executeAsync内通过调用DefaultExecutorServiceLoader.getExecutorFactory检索jars 的META-INF.services目录,加载适合的ExecutorFactoryJava SPI),当前Job可用的是YarnJobClusterExecutorFactory
  • 通过YarnJobClusterExecutorFactory获取YarnJobClusterExecutor,然后执行YarnJobClusterExecutor.execute
    • 调用PipelineExecutorUtils.getJobGraphStreamGraph转换为JobGraph,转换的重要逻辑在StreamingJobGraphGenerator.createJobGraph内,创建JobGraph的主要操作有:创建一个包有 32 位随机数的JobID;为Graph的每个顶点生成一个全局唯一的hash数(用户可通过DataStream.uid设置);生成JobVertex,它是Flink Task的上层抽象,包含Operators, invokableClass, SlotSharingOperatorChaining等信息,存放在JobGraph的成员变量taskVertices中;此外还有, ExecutionConfig, SavepointConfig, JarPaths, Classpaths 等信息;
    • 调用YarnClusterClientFactory.getClusterSpecificationConfiguration中解析当前提交JobJobManager/TaskManager的内存信息,用于校验Yarn Cluster是否有足够的资源分配给Job启动;
    • 调用YarnClusterDescriptor.deployJobCluster执行具体的Job提交流程,返回一个ClusterClientJobClientAdapter对象,其内部通过RestClusterClient对象与Yarn Cluster通信,可获取Job状态或是执行一些其它操作;

4. 调用YarnClusterDescriptor.deployJobCluster执行Job提交:

  • 调用YarnClusterDescriptor.deployInternal代码逻辑阻塞直到JobManager提交Yarn完成,逻辑如下:

    • 执行Kerberos认证(如果有需要);
    • 校验上下文环境&配置是否满足Job提交条件;
    • 查看是否存在合适的yarn queue
    • 校验/适配ClusterSpecification(利用集群Yarn Container vCores/Memory的配置);
    • 调用YarnClusterDescriptor.startAppMaster启动AppMaster(下文详解);
    • 将调用startAppMaster返回的JobManager相关信息和applicationId写入Configuration
  • 调用YarnClusterDescriptor.startAppMaster

    • Configuration获取FileSystem配置信息,然后从plugins目录加载jars 初始化FileSystem实例;
    • Zookeeper Namespace写入Configuration对象,可以通过high-availability.cluster-id配置,默认是applicationId
    • 创建YarnApplicationFileUploader对象,然后将log4j.properties, logback.xml, flink-conf.yaml, yarn-site.xml, UserJars, SystemJars, PluginsJars, JobGraph序列化kerberos配置/认证等文件上传到hdfs:///user/$username/.flink/$applicationId/目录下;
    • 调用JobManagerProcessUtils::processSpecFromConfigWithNewOptionToInterpretLegacyHeap,从Configuration获取Memory配置并计算出JobManager所在进程的Memory分配数值,最终以-Xmx, -Xms, -XX:MaxMetaspaceSize, -XX:MaxDirectMemorySize形式用到JVM进程启动;此外,Memory计算对旧版配置FLINK_JM_HEAP, jobmanager.heap.size, jobmanager.heap.mb做了兼容处理;
    • 调用YarnClusterDescriptor.setupApplicationMasterContainer创建ContainerLaunchContext(启动Container所需的信息集合);
    • ApplicationName, ContainerLaunchContext, Resource(向ResourceManager申请资源), CLASSPATH, Environment Variables, ApplicationType, yarn.application.node-label, yarn.tags等信息封装到ApplicationSubmissionContext;此时,ApplicationSubmissionContext就封装了ResourceManager启动ApplicationMaster所需的所有信息;
    • 为当前线程添加提交任务失败的回调函数,用于在提交任务失败后kill Application & delete Files that have been uploaded to HDFS
    • 调用YarnClientImpl.submitApplication将任务提交给Yarn Client Api处理;
    • 轮训YarnClientImpl.getApplicationReport等待提交任务提交成功(AppMaster正常启动),最后返回任务状态ApplicationReport

YarnClientImpl.submitApplication内通过调用rmClient.submitApplicationYarn Client提交Job

  • 成员变量YarnClientImpl.rmClient通过调用ConfiguredRMFailoverProxyProvider.getProxy获取到,YarnClientImpl.rmClient实例是ApplicationClientProtocolPBClientImpl的代理对象,其内部通过ProtoBuf + RpcEngine提交任务到Yarn Server端;
  • Yarn ServerClientRMService.submitApplication会收到所有来自Yarn ClientJob提交请求,执行后序的AppMaster启动操作;

[Server] 启动 JobManager

// Server端在Container中启动JobManager的流程
[1] -> YarnJobClusterEntrypoint::main(String[] args)
    -> -> EnvironmentInformation.logEnvironmentInfo
    -> -> SignalHandler.register
    -> -> JvmShutdownSafeguard.installAsShutdownHook
    -> -> YarnEntrypointUtils.logYarnEnvironmentInformation
    -> -> YarnEntrypointUtils.loadConfiguration
    -> -> ClusterEntrypoint::runClusterEntrypoint
[2] -> -> -> YarnJobClusterEntrypoint.startCluster
    -> -> -> -> PluginUtils.createPluginManagerFromRootFolder
    -> -> -> -> ClusterEntrypoint.configureFileSystems
    -> -> -> -> ClusterEntrypoint.installSecurityContext
    -> -> -> -> ClusterEntrypoint.runCluster
    -> -> -> -> -> ClusterEntrypoint.initializeServices
    -> -> -> -> -> YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory
    -> -> -> -> -> DefaultDispatcherResourceManagerComponentFactory.create
    -> -> -> -> -> -> JobRestEndpointFactory.createRestEndpoint -> MiniDispatcherRestEndpoint.start
    -> -> -> -> -> -> YarnResourceManagerFactory.createResourceManager -> YarnResourceManager.start
    -> -> -> -> -> -> ZooKeeperUtils.createLeaderRetrievalService -> ZooKeeperLeaderRetrievalService.start
    -> -> -> -> -> -> DefaultDispatcherRunnerFactory.createDispatcherRunner
    -> -> -> -> -> -> -> JobDispatcherLeaderProcessFactoryFactory.createFactory
    -> -> -> -> -> -> -> DefaultDispatcherRunner::create
    -> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::createFor
    -> -> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::new -> ZooKeeperLeaderElectionService.start
    -> -> -> -> -> -> -> -> -> -> ZooKeeperLeaderElectionService.isLeader
    -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.grantLeadership
    -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess
    -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.start -> AbstractDispatcherLeaderProcess.startInternal
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.onStart
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherGatewayServiceFactory.create
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherFactory.createDispatcher (create AkkaServer)
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> MiniDispatcher.start (call AkkaServer to execute job start)
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.onStart
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherBootstrap.initialize -> AbstractDispatcherBootstrap.launchRecoveredJobGraphs
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runRecoveredJob
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runJob

// 调用链太长,从`Dispatcher.runJob`新建
[3] -> Dispatcher.runJob
    -> -> Dispatcher.createJobManagerRunner
    -> -> -> DefaultJobManagerRunnerFactory.createJobManagerRunner
    -> -> -> -> JobManagerRunnerImpl::new
    -> -> -> -> -> DefaultJobMasterServiceFactory.createJobMasterService
    -> -> -> -> -> -> JobMaster::new
    -> -> Dispatcher.startJobManagerRunner
[4] -> -> -> JobManagerRunnerImpl.start
    -> -> -> -> ZooKeeperLeaderElectionService.start -> ZooKeeperLeaderElectionService.isLeader
    -> -> -> -> -> JobManagerRunnerImpl.grantLeadership -> JobManagerRunnerImpl.verifyJobSchedulingStatusAndStartJobManager -> JobManagerRunnerImpl.startJobMaster
    -> -> -> -> -> -> ZooKeeperRunningJobsRegistry.setJobRunning
    -> -> -> -> -> -> JobMaster.start
    -> -> -> -> -> -> -> JobMaster.startJobExecution
    -> -> -> -> -> -> -> -> JobMaster.setNewFencingToken
    -> -> -> -> -> -> -> -> JobMaster.startJobMasterServices
    -> -> -> -> -> -> -> -> JobMaster.resetAndStartScheduler
    -> -> -> -> -> -> -> -> -> DefaultJobManagerJobMetricGroupFactory.create
    -> -> -> -> -> -> -> -> -> JobMaster.createScheduler -> DefaultSchedulerFactory.createInstance -> DefaultScheduler::new
    -> -> -> -> -> -> -> -> -> JobMaster.startScheduling
    -> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobStatusListener
    -> -> -> -> -> -> -> -> -> -> SchedulerBase.startScheduling
    -> -> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobMetrics
    -> -> -> -> -> -> -> -> -> -> -> SchedulerBase.startAllOperatorCoordinators
    -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.startSchedulingInternal
    -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.prepareExecutionGraphForNgScheduling
    -> -> -> -> -> -> -> -> -> -> -> -> EagerSchedulingStrategy.startScheduling -> EagerSchedulingStrategy.allocateSlotsAndDeploy   // stream job use EagerSchedulingStrategy as default Scheduler
    -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlotsAndDeploy    // explain ExecutionVertexID
[5] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlots -> DefaultExecutionSlotAllocator.allocateSlotsFor -> DefaultExecutionSlotAllocator.allocateSlot -> NormalSlotProviderStrategy.allocateSlot
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SchedulerImpl.allocateSlot -> SchedulerImpl.allocateSlotInternal -> SchedulerImpl.internalAllocateSlot -> SchedulerImpl.allocateSingleSlot -> SchedulerImpl.requestNewAllocatedSlot
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SlotPoolImpl.requestNewAllocatedBatchSlot -> SlotPoolImpl.requestNewAllocatedSlotInternal -> SlotPoolImpl.requestSlotFromResourceManager
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceManager.requestSlot -> SlotManagerImpl.registerSlotRequest -> SlotManagerImpl.internalRequestSlot -> SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot -> SlotManagerImpl.allocateResource
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceActionsImpl.allocateResource -> YarnResourceManager.startNewWorker -> YarnResourceManager.requestYarnContainer -> AMRMClientAsyncImpl.addContainerRequest
[6] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.waitForAllSlotsAndDeploy -> DefaultScheduler.deployAll -> DefaultScheduler.deployOrHandleError -> DefaultScheduler.deployTaskSafe
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultExecutionVertexOperations.deploy -> ExecutionVertex.deploy -> Execution.deploy
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskDeploymentDescriptorFactory.createDeploymentDescriptor
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> RpcTaskManagerGateway.submitTask
    -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskExecutor.submitTask [invoke TaskExecutor via RPC which created by calling JobMaster.registerTaskManager]

1. 在Per-job模式下,AppMaster进程的启动入口是YarnJobClusterEntrypoint.main

  • 调用EnvironmentInformation::logEnvironmentInfo打印环境变量信息;
  • 调用SignalHandler::register注册TERM, HUP, INT等终止/退出信号处理(例如:kill);
  • System::getEnv获取变量PWD,该目录为进程的启动目录,目录下包含进程启动所需的jar, config, job.graph, launch_container.sh等文件(通过ln创建的链接文件),PWD目录位置:${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache/${applicationId}/${containerId}
  • 调用YarnEntrypointUtils::logYarnEnvironmentInformation打印Yarn相关信息;
  • 调用YarnEntrypointUtils.loadConfigurationflink-conf.yamlSystem.env构建Configuration对象;
  • 创建YarnJobClusterEntrypoint对象,并为当前进程添加shutDownHook(在进程退出前进行删除本地文件的操作);
  • 调用ClusterEntrypoint::runClusterEntrypoint,函数内部通过调用YarnJobClusterEntrypoint.startClusterAppMaster启动操作,然后为YarnJobClusterEntrypoint对象注册了监控Termination的回调函数,用于打印进程结束的exit code等信息;

2. 调用YarnJobClusterEntrypoint.startCluster,依次执行:

  • 调用PluginUtils::createPluginManagerFromRootFolder,将pluginname, jars填充到PluginDescriptor,然后将PluginDescriptor和委托给parent of plugin.classLoader加载的包名列表封装到DefaultPluginManager
  • 调用ClusterEntrypoint.configureFileSystems内部通过Java SPI去加载并初始化所有jars(包括common/plugin jars)的META-INF/services/目录下的FileSystemFactory服务;
  • 调用ClusterEntrypoint.installSecurityContext,创建ZooKeeperModule, HadoopModule, JaasModuleHadoopSecurityContext对象并存放在SecurityUtils的成员变量,然后通过创建的HadoopSecurityContext对象触发执行ClusterEntrypoint.runCluster;从代码阅读来看:installSecurityContext的目的在于向运行环境添加必要的用户权限环境变量配置;
  • ClusterEntrypoint.runCluster执行流程:
    • 调用ClusterEntrypoint.initializeServices初始化多个服务模块;
      • commonRpcServiceAkkaRpc服务,处理本地和远程的服务调用请求;
      • haServicesZooKeeperHaServices服务,处理与zookeeper的交互请求,如JobManager/ResourceManager等组件的leader选举;
      • blobServer:处理Job BLOB文件的/上传/下载/清除等操作,BLOB包括Jars,RPC消息等内容;
      • heartbeatServices:管理Job各组件的心跳服务;
      • metricRegistry:监控指标注册,持有Job各监控指标和MetricReporters 信息;
      • processMetricGroup:系统/进程监控指标组,包含CPU, Memory, GC, ClassLoader, Network, Swap等监控指标;
      • archivedExecutionGraphStore:用于保存序列化的ExecutionGraph
    • 调用YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory创建工厂对象DefaultDispatcherResourceManagerComponentFactory,工厂对象用于创建DispatcherResourceManagerComponent对象,该对象的创建过程是Job启动的核心逻辑所在;另外提一下,FileJobGraphRetriever将本地文件job.graph反序列化为JobGraph
    • DefaultDispatcherResourceManagerComponentFactory.create执行流程:
      • 创建并启动Flink-Web-DashboardRest接口服务WebMonitorEndpoint
      • 创建并启动YarnResourceManager以及其内部服务SlotManager,负责Job的资源管理,如:申请/释放/记录;
      • 创建并启动MiniDispatcher,在Dispatcher启动过程中会创建JobManager线程完成任务的启动;
      • 向 ZooKeeper 注册ResourceManager, Dispatcher的监听协调服务,用于服务的容错恢复,通过在Zookeeper中保持了一个锁文件来协调服务;
      • 上述WebMonitorEndpoint, YarnResourceManager, MiniDispatcher三个服务在ZooKeeper下保存着各自的相关信息,通过ZooKeeperHaServices保证服务的高可用;三个服务的启动过程是通过AkkaRPC + 状态机的设计模式实现的,在各服务对象创建过程中注册生成AkkaServer,在服务启动过程中通过AkkaRPC调用不同状态机函数,最后回调onStart执行实际的启动逻辑,完整的启动逻辑较为复杂,需耐心翻阅;状态机默认状态是:StoppedState.STOPPEDAkkaServer创建逻辑在AkkaRpcService.startServer内,另外,在AkkaRpcService.startServer内通过调用AkkaRpcService.registerAkkaRpcActor创建AkkaRpcActorAkkaRpcActor为具体接收消息并执行状态机逻辑的入口,接收消息的处理逻辑在AbstractActor.createReceive内;

3. 调用Dispatcher.runJob进入JobManager启动流程:

  • 调用Dispatcher.createJobManagerRunner创建JobManagerRunnerImpl, JobMaster对象,JobMaster代表了一个运行的JobGraph,在JobMaster::new过程中创建了RpcServer, DefaultScheduler, SchedulerImpl, SlotPoolImpl, ExecutionGraph, CheckpointCoordinator等重要对象,创建ExecutionGraph入口:SchedulerBase.createExecutionGraph
  • 调用Dispatcher.startJobManagerRunner执行整个任务的提交流程:启动JobManager端各服务模块,申请Slot资源启动TaskManager,提交/执行Task等流程;

4. 从调用JobManagerRunnerImpl.start开始到DefaultScheduler.allocateSlotsAndDeploy结束

  • 在这段代码调用区间内主要做了一些服务的启动,Job状态的变更和监听服务的添加,为JobManager添加监控指标;代码较简单,在此不一一说明;

5. 调用DefaultScheduler.allocateSlots进入资源申请流程: // TODO find slot from existing resource, if don‘t find request resourceManager for a new resource AMRMClientAsyncImpl.CallbackHandlerThread // get Resource that have been allocated and call YarnResourceManager.onContainersAllocated to process TaskManager AMRMClientAsyncImpl.HeartbeatThread // invoke AMRMClientImpl.allocate periodically, ResourceManager will be called to allocate Resource YarnResourceManager.requestYarnContainer // put request of allocate Container to member variable AMRMClientImpl.ask

6. 调用DefaultScheduler.waitForAllSlotsAndDeploy进入Task发布流程

// TODO

获取到的 container 资源存放在纳贡,以供给后序 slot 分配; start TaskManager and register it to JobManager

[Server] 启动 TaskManager

// TODO

  1. YarnResourceManager.onContainersAllocated
  2. org.apache.flink.yarn.YarnTaskExecutorRunner

内存计算

  • start command for TaskManager -> TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec
  • resource request for TaskManager -> WorkerSpecContainerResourceAdapter.createAndMapContainerResource

参考文档


Similar Posts

Comments