DorisDB | ClickHouse | TIDB | |
---|---|---|---|
版本 | DorisDB-SE-1.15.2 | ClickHouse server version 21.6.5 revision 54448 | TiDB-v5.1.0 TiDB Server Community Edition (dev) |
定位 | ROLAP MPP 执行架构 支持物化视图(RollUp) 使用 Index 加速查询 向量化计算(AVX2 指令集) 稀疏索引 |
ROLAP MPP 架构的列存储库,多样化的表引擎 以有序存储为核心的完全列存储实现 并行扫磁盘上的数据块,降低单个查询的 latency 极致向量化计算 稀疏索引 |
HTAP 行存储(TiKV 存储方式) 列存储(TiFlash 组件实现) |
语言层面 | Front End: Java, Back End: C++ | C++ | TiDB: Go, TiKV: Rust, RocksDB: C++ |
架构层面 | Front End: 管理元数据 监督管理 BE 的上下线 解析 SQL,分发执行计划 协调数据导入 Back End: 接收并执行计算任务 存储数据,管理副本 执行 compact 任务 |
Zookeeper: 执行分布式 DDL; 多副本之间的数据同步任务管理下发; ClickHouse: 数据存储查询; 架构扁平化,可部署任意规模集群; |
TIDB 作为计算&调度层: SQL 解析/分发/谓词下推到存储层; 集群信息收集,replica 管理等; TiKV+RocksDB 作为数据存储层: TiKV 实现了事务支持/Raft 协议/数据读写; 数据最终存放在 RocksDB; TiFlash: 列存储,用来加速查询&支持 OLAP 相关需求; TiSpark: 计算引擎,用来支持 OLAP 相关需求; |
系统安装&维护 | 成本低; 收费版本提供 DorisManager 工具来安装维护集群,免费版本需要手动安装; 看官方文档说:“DorisDB 是一个自治的系统,节点的上下线,集群扩缩容都可通过一条简单的 SQL 命令来完成,数据的自动 Reblance”; 看各种技术文提到 DorisDB 维护成本也是较低,但不清楚是否能达到官方文档中的效果和是否有各种异常出现; |
成本高 Clickhouse 需安装的组件较少,安装起来也较为简单;但集群不能自发感知机器拓扑变化,不能自动 Rebalance 数据,扩容/缩容/上下线分片、副本等都需要修改配置文件或执行 SQL 语句做数据迁移表变更的操作,有些时还需需要重启服务,维护成本高; |
成本中; 个人感觉 TiDB 的组件较多, 计算存储相关就有 TiDB,TiKV,TiFlash,TiSpark 等; 所以系统安装相对麻烦一点(需要安装的组件较多),对于扩容/缩容/升级等操作官方有提供 TiUP 组件,操作相对简单; |
监控&报警 | 收费版本中可使用 DorisManager 提供的控报警的功能; 免费版本可以通过 Prometheus+Grafana 进行监控报警; |
Clikckhouse 支持对接 Prometheus ,可以通过 Prometheus+Grafana 进行监控报警; | 官方提供一套 Prometheus+Grafana 来实现监控报警的功能; |
系统升级 | 可以滚动/平滑的升级,官方有相关升级文档 | 通过 yum 命令进行升级 | 通过 TiUP 实施在线/离线升级 |
集群能力 | 官方文档中说集群规模可扩展到数百个节点, 支持 10PB 级别的数据分析; |
官方提供案例集群规模在 300+,20 万亿+条的数据, 10PB+级别的数据; |
官方文档中有说最大支持 512 个节点, 集群最大容量 PB 级; |
SQL 语句支持 | 支持标准 SQL 语法 | 支持大部分的标准 SQL 语法 有部分非标准 SQL 语法,有一定学习成本 |
支持标准 SQL 语法 |
SQL 数据类型 | Number, String[1, 65533], Date, HyperLogLog, Bitmap, Array, Nested-Array | Number, String[1, ∞], Date, Bitmap, Enum, UUID, IP, Tuple, Array, Nested Array, Map, Nested Type, AggregateFunction, Lambda Expression | Number, String[1, ∞], Date, Blob, Binary, Enum, Set, Json |
Schema 变更 | 支持对 column/rollup/index 等进行增删改的操作; | Distributed, Merge, MergeTree 等表引擎支持 Alter 语句,可以对 column/index 等进行增删改的操作;ALTER 操作会阻塞对表的所有读写操作; | 支持对 column/index 进行增删改的操作; |
函数支持 | 支持日期/地理/字符串/聚合/窗口/bitmap/hash/cast 等函数; 支持 UDF,但需要写 C++; |
支持函数较 DorisDB 丰富,复合数据类型的函数支持也很丰富,诸如:hasAny, hasAll 很是用; 支持 UDF,但需要写 C++; |
支持的函数种类也很多,但在 JSON/复合结构的函数支持上一般; 暂不支持 UDF; |
数据模型 | 明细模型,聚合模型,更新模型(允许定义 UNIQUE KEY,导入数据时需要将所有字段补全才能够完成更新操作) | 多样的表引擎提供了多样的数据聚合&分析模型 https://clickhouse.tech/docs/en/engines/table-engines/ | TiFlash:列存储,TiSpark: |
事务支持 | 每个导入任务都是一个事务 | 没有完整的事务支持 | 支持分布式事务,提供乐观事务与悲观事务两种事务模型 |
写入支持 | 支持从 Mysql, Hive, HDFS, S3, Local Disk, Kafka 等系统拉取数据; 支持通过 Spark, Flink, DataX, Insert into table select … 等方式写入数据; 支持 CSV、ORCFile、Parquet、Json 等文件格式; |
支持从 PostgreSQL, Mysql, Hive, HDFS, Kafka, Local Disk, 等系统拉取数据; 支持通过 Spark,Flink,Insert into table select …等方式写入数据; 支持 CSV, TSV, Json, Protobuf, Avro, Parquet, ORC, XML…等数据格式; |
支持从 Mysql, CSV, SQL 文件导入到 TiDB; 支持通过 Load data…等方式写入数据; TiDB Lightning 支持 Dumpling, CSV, Amazon Aurora Parquet 等数据格式; |
更新支持 | 支持 Insert/Insert into table select…语句; 不支持 update 语句,需要采用 Insert+Merge; |
支持 Insert/Insert into table select…语句; 支持 update 语句,通过语法 ALTER TABLE table_name UPDATE column1 = expr1 [, …] WHERE filter_expr 实现,更新相当于重建分区,较重操作,以异步方式执行; |
TiDB 支持 Insert/Insert into table select…语句; TiDB 支持 Update 语句; TiFlash 可以与 TiKV 数据保持一致,通过 Raft 实现时数据同步; |
删除支持 | 支持语句:DELETE FROM table_name [PARTITION partition_name] WHERE filter_expr | 支持语句:ALTER TABLE [db.]table [ON CLUSTER cluster] DELETE WHERE filter_expr | TiDB 支持 Delete 语句 |
查询支持 | Select 语句基本符合 SQL92 标准,支持 inner join,outer join,semi join,anti join,cross join。在 inner join 条件里除了支持等值 join,还支持不等值 join, 为了性能考虑,推荐使用等值 join。其它 join 只支持等值 join。 | 有非标准查询语法,支持 Join 语句,详细看社区文档: https://clickhouse.tech/docs/en/sql-reference/statements/select/ |
SELECT 语句与 MySQL 完全兼容; |
数据交换 | 外部表支持:Mysql, HDFS, ElasticSearch, Hive; 交互支持:Kafka, Flink, Spark, Hive, Http, Mysql wire protocol |
外部表支持:Mysql, HDFS, Http url, Jdbc, Odbc; 交互支持:Flink(通过 clickhouse-jdbc 实现), Spark, Http, Jdbc, Odbc, Native Tcp, Command line, Mysql wire protocol, SQL 可视化工具 |
交互支持:通过 Flink,TiSpark 进行查询操作; 对外暴漏的是 SQL 层,可以通过 JDBC, Mysql wire protocol 进行读写; |
集群模式 | Front End + Back End | Zookeeper + Clickhouse | TiDB + PD + TiKV + TiFlash + TiSpark |
问题遗留 | 当前不支持 update 语句,2021.7 月的新版本会支持 | optimize table $name final 性能不高,多表 join 性能一般 | |
简短概括 | Doris 依靠索引+Rollup+分区分桶等方案实现高效的 OLAP,支持明细查询;复合数据类型支持有限,维护成本低; | Clickhouse 依靠列存储+向量化+分片+多样的表引擎实现高效 OLAP,支持明细查询;功能强大,维护成本高; | Tidb 更多是满足了 TP 场景,对与 AP 支持不足,OLAP 能力弱于 Doris&Clickhouse,支持明细查询;官方目标是支持 100%TP 场景和 80%AP 场景,大数据量/复杂 AP 场景通过 TiSpark 解决,维护成本较低; |
综合对比 | DorisDB 和 Clickhouse 都是为 OLAP 而设计的系统,DorisDB 在系统运维等方面十分方便,但相对 Clickhouse 在对复合数据类型支持上不够,暂不支持 Update 操作,在数据模型支持上也稍弱于 Clickhouse,SQL 函数支持上没 ClickHouse 丰富; | Clickhouse 集群拓扑变化,分片上下线等都无法自动感知并进行 Reblance,维护成本高,多表 join 性能不稳定,但从 OLAP 引擎功能来讲 Clickhouse 是三者中最强大的一个,且有一定 Update 能力(OLAP 本身不善长 Update 操作); | TiDB 的 TP 能力三者最好,AP 能力较差,毕竟大家都说是分布式 Mysql,增/删/改/事务能力强,能同步 Mysql binlog,但 AP 能力更多依靠 TiFlash/TiSpark,查询聚合耗时较其他两者长且不可控,组件较多,学习安装成本感觉较 DorisDB 高一些; |
user_id
为主键,标签为列名的大宽表;Array
,MAP
,Array<Map>
等复合类型;DDL
语句;UPDATE
语句;优点:
UPDATE
操作;缺点:
复合数据类型只支持Set
/JSON
,对JSON
来说,其内部结构不可见,Schema
定义不够明确;
JSON
这种半结构化数据格式是以Binary
形式进行序列化存储的(JSON
本身不能创建索引,需要反序列化解析后才能做过滤操作?),用于圈选性能较差,为JSON
定义的函数不够丰富,且无法通过自定义函数来补充;
数据写入当前好想只能通过 Insert Into
语句,性能一般,不支持直接导入JSON
数据;TiDB
有DM
工具,但只能实时同步 Mysql Binlog
,缺少与多种其他存储介质的数据交换功能;Spark/Flink
的Sink Connector
好像没有,可能需要自己手动实现;
缺少和Hadoop
生态圈的交互以及数据同步组件,可能需要通过Jdbc
或TiKV API
接口实现;
其他:
TiKV Client Java Api
,Flink Source Connector
是通过这个包自己定制直接读取 TiKV
数据的,可以做一些Filter Push Down
;优点:
Array
这种复合结构(当前只明细模型可用);缺点:
不开源,DorisDB
源码全都看不到,社区活跃度较ClickHouse
差;
支持复合数据类型不够,遇到Array
,Map
或Array<Map>
类型的标签需要打成宽表或拆表(多表Join
性能良好);
当前不支持UPDATE
操作, 需要通过INSERT OVERWRITE
模拟更新操作(官方说是 7 月版本会支持UPDATE
);
当前只能在Duplicate Table
中定义Array
类型,而圈选功能需要选择更新/聚合模型,这使得Array
类型标签必须拆分到新表中,不太能满足圈人的功能需求;
其他:
发现一个比较怪的问题,Apache Doris
和DorisDB
感觉上有分歧?按道理Apache Doris
和DorisDB
标准版应该是同一个产品定位吧,两者间的的Flink Connector
实现代码差异较大,内置的UDF
函数也各不相同,源码层面二者还是有一定差别的;
Apache Doris
暂时不支持符合结构数据,如:Array
,Map
等;
优点:
支持UPDATE
操作(但性能一般,不如TiDB
,可以用INSERT OVERWRITE
模拟);
有多样的表引擎和SQL
函数;
支持多样的复合数据类型 Array
, Map
, Nested Type
(但尝试写JSON
数据到类型为Array<Nested>
列时失败,可能是写入方法问题?);
OLAP
系统,查询性能较好;
缺点:
Join
性能不稳定;SQL
,有学习成本;其他:
Apache License Version 2.0
;Apache
社区,项目规划走向被俄罗斯团队把控;ClickHouse
;ClickHouse
和DorisDB
在做OLAP
的性能和功能上高于TiDB
;DorisDB
的主要问题是对复合数据类型的支持不够(比如Array
),这使得很多是Array
类型的列必须进行拆表操作,业务成本高,增加了标签数据写入和查询等业务实现的复杂度;Update
这个必须的Feature
功能尚未看到;函数支持没ClickHouse
丰富,在做查询时有些过滤规则没办法实现(比如:Array hasAny/hasAll
);TiDB
对复合数据类型的支持不够,只有Set
,JSON
这种复合结构,与Hadoop
生态或其他外部存储结合度不高,数据的导入导出不够方便,支持导入的数据类型也不够丰富,使用上不太方便;函数支持上没ClickHouse
丰富,没办法做Array hasAny/hasAll
等操作;另外TiDB
的查询性能不保证满足需求,需做测试;ClickHouse
从功能角度来讲是最能满足用户圈选需求的系统,唯一的问题是维护成本较高,当前国内已有公司将Clickhouse
应用到了画像场景;ClickHouse | TiDB(No TiFlash) | |
---|---|---|
Linux 版本 | CentOS Linux release 7.2.1511 | CentOS Linux release 7.9.2009 |
CPU | QEMU Virtual CPU version (cpu64-rhel6) | Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz |
CPU 核数 | 8C | 8C |
CPU 线程数 | 8 | 8 |
内存大小 | 8G | 32G |
磁盘大小 | 1T | 1T |
-- 表`user_profile_a`数据量:4528641条
-- 表`user_profile_b`数据量:12836360条
-- SQL-1: 基本数据类型的组合过滤
select count(*) from user_profile_b where channel_type = 'nature' and user_type = 2 and lang = 'zh_CN' and region = 'CHN' and country = 'CHN' and gender = 0;
-- SQL-2 过滤double类型数值,通过Math.random() 为每条测试数据随机生成一个score
select count(*) from user_profile_ where score > 0.2;
-- SQL-3: 数组类型的过滤
select count() from user_profile_b where hasAny(user_watch_list, ['TSLA']); -- Clickhouse
select count(*) from user_profile_b where JSON_CONTAINS(user_watch_list, '"TSLA"', '$'); -- TIDB
-- SQL-4: 组合SQL-1, SQL-3的所有条件过滤
select count() from user_profile_b where channel_type = 'nature' and user_type = 2 and lang = 'zh_CN' and region = 'CHN' and country = 'CHN' and gender = 0 and hasAny(user_watch_list, ['TSLA']); -- ClickHouse
select count(*) from user_profile_b where channel_type = 'nature' and user_type = 2 and lang = 'zh_CN' and region = 'CHN' and country = 'CHN' and gender = 0 and JSON_CONTAINS(user_watch_list, '"TSLA"', '$'); -- TiDB
-- SQL-5: Join两张表,基本数据类型过滤条件
select count(*) from user_profile_b b, user_profile_a a where a.uuid = b.uuid and a.channel_type = 'nature' and a.user_type = 2 and b.lang = 'zh_CN' and b.region = 'CHN' and b.country = 'CHN' and b.gender = 0;
-- SQL-6: Join两张表,带有Array类型的过滤条件
select count() from user_profile_b b, user_profile_a a where a.uuid = b.uuid and a.channel_type = 'nature' and a.user_type = 2 and b.lang = 'zh_CN' and b.region = 'CHN' and b.country = 'CHN' and b.gender = 0 and hasAny(a.user_watch_list, ['TSLA']); -- ClickHouse
select count(*) from user_profile_b b, user_profile_a a where a.uuid = b.uuid and a.channel_type = 'nature' and a.user_type = 2 and b.lang = 'zh_CN' and b.region = 'CHN' and b.country = 'CHN' and b.gender = 0 and JSON_CONTAINS(a.user_watch_list, '"TSLA"', '$'); -- TiDB
ClickHouse | TiDB(No TiFlash) | Explain | |
---|---|---|---|
SQL-1 | 0.069s | 1.695s | 单表简单数据类型的条件组合查询 |
SQL-2 | 0.029s | 1.113s | 单表Double 类型数据的条件查询 |
SQL-3 | 1.732s | 30.935s | TiDB 官方文档说:”使用Binary 格式进行序列化,对JSON 的内部字段的查询、解析加快”,但从测试结果来看与CK 相差较多 |
SQL-4 | 0.229s | 1.142s | 应该有做SQL 优化:过滤简单数据类型的条件前置,减少需过滤Array 条件的数据量,查询性能较SQL-3 提升不少 |
SQL-5 | 0.626s | 0.536s | 两张表Join 情况下TiDB 要略好于 CK ,看来CK 多表Join 性能确实不太好? |
SQL-6 | 1.243s | 0.880s | 两张表Join 情况下TiDB 要略好于 CK ,看来CK 多表Join 性能确实不太好? |
DorisDB
纳入对比,一方面是因为DorisDB
满足不了画像场景的业务需求,比如对Array
等复合类型支持不够,另一方面和DorisDB
的工作人员沟通得到了一些性能方面的测试结论,自己假设了DorisDB
的单表性能与ClickHouse
相当,多表Join
性能高于ClickHouse
;ClickHouse
在多表Join
方面表现一般,若有多表做复杂Join
或大表间做Join
操作的需求,建议做更多具体的性能测试;Array
查询优化来讲,在ClickHouse
中可以考虑将Array
转Bitmap
来优化查询;TiDB
有TiFlash
,但不太清楚如何处理Array
这类复合类型,是否能满足性能需求;Insert into select
语句的性能大都取决于Select
,非特别大数据量需写入的情况下Insert
耗时相对较少;ClickHouse
的Delete
,Update
语句性能不高,这个需要再做测试,或用Insert
语句替换;ClickHouse
的 optimize table $table final
性能不高,1200W
数据测试需要8s~14s
时间,生产上使用该操作时最好单独周期执行,比如5min
执行一次;当前团队Flink
集群使用的版本是1.7.2
,采用per-job on yarn
的运行模式,在近一年多的使用过程中碰到过多次内存相关的问题,比如:beyond the 'PHYSICAL' memory limit... Killing container.
,总是感觉Flink Streaming
在实际场景中的内存管理不够完美,会遇到各样的问题。在Flink 1.10
版本 release 后,了解到该版本对TaskExecutor
的内存配置做了重新设计,内心有想要去了解的冲动,然而看过社区文档后又有了更多的疑问,比如:TaskExecutor
对应的 JVM 进程在启动时只会有-Xmx -Xms -XX:MaxDirectMemorySize
三个内存相关参数是通过Flink
计算得出的,新增的细粒度配置能给JVM
这三个启动参数带来多少变化,或是只是一个方便内存计算的工具,对于对Flink
内存较为了解的人来讲,通过旧的内存配置参数可以完成与新配置一样的效果。
起初这篇文章计划写Flink Streaming
新/旧内存管理对比相关的内容,然而之后一段时间的业余精力被学习Rust
消耗掉啦,6 月底才算有时间开篇;之前在阅读内存管理代码同时参杂读了些任务启动相关代码,所以就扩展下之前计划写的文章范围:以描述Flink Streaming
整个启动流程为主,辅以内存分配/管理相关代码分析,阅读的Flink
代码版本定为最新版本1.11.0
。
启动流程: 从脚本提交到任务启动成功(Flink-Web-Dashboard
展示的任务状态为running
)的整个流程,这个流程大致分为 3 个Stage
:
Client
端:封装信息提交给Yarn
,注册回调函数,轮训 Job
状态;Yarn
分配Container
启动AppMaster
;AppMaster
向Yarn
申请资源启动TaskManager
;脚本示例:
// Per-job model
flink run -m yarn-cluster -yn 24 -ys 2 -ytm 6g -ynm $job_name -c $main_class -d -yq ./$job_jar $params
StreamGraph:
graph LR
A(KafkaSource) --> B(MapOperator)
B --> C(KafkaSink)
// Client源码调用流程
[1] -> CliFrontend::main(String[] args)
-> -> CliFrontend.parseParameters
-> -> -> CliFrontend.run
[2] -> UserMainClass::main(args: Array[String])
-> -> StreamExecutionEnvironment.getExecutionEnvironment
-> -> StreamExecutionEnvironment.addSource
-> -> DataStream.map
-> -> DataStream.addSink
[3] -> -> StreamExecutionEnvironment.execute
-> -> -> StreamExecutionEnvironment.getStreamGraph
-> -> -> -> StreamExecutionEnvironment.getStreamGraphGenerator
-> -> -> -> StreamGraphGenerator.generate
-> -> -> StreamExecutionEnvironment.execute(StreamGraph)
-> -> -> -> StreamExecutionEnvironment.executeAsync
-> -> -> -> -> DefaultExecutorServiceLoader.getExecutorFactory
-> -> -> -> -> YarnJobClusterExecutorFactory.getExecutor
-> -> -> -> -> YarnJobClusterExecutor.execute
-> -> -> -> -> -> PipelineExecutorUtils::getJobGraph
-> -> -> -> -> -> YarnClusterClientFactory.createClusterDescriptor
[4] -> -> -> -> -> -> YarnClusterDescriptor.deployJobCluster
-> -> -> -> -> -> -> YarnClusterDescriptor.deployInternal
-> -> -> -> -> -> -> -> YarnClusterDescriptor.startAppMaster
-> -> -> -> -> -> -> -> -> YarnClientImpl.submitApplication
-> -> -> -> -> -> return CompletableFuture(new ClusterClientJobClientAdapter)
-> -> -> -> -> [action] get `JobClient`, invoke `JobListener`, return `JobClient`
-> -> -> -> [action] create `JobExecutionResult`, invoke `JobListener`, return `JobExecutionResult`
1. 任务提交脚本会触发调用org.apache.flink.client.cli.CliFrontend::main(String[] args)
,然后依次会执行:
EnvironmentInformation::logEnvironmentInfo
加载 JVM 上线文环境;GlobalConfiguration::loadConfiguration
加载flink-conf.yaml
配置信息;CliFrontend.loadCustomCommandLines
返回由GenericCLI
,FlinkYarnSessionCli
,DefaultCLI
组成的List<CustomCommandLine>
对象;FlinkYarnSessionCli
中包含与yarn-cluster
提交模式相关,可以通过Command
命令提交的参数列表,如:-yid, -ynm, -yqu
等等;CliFrontend.new
创建CliFrontend
对象,利用[3]
中的List<CustomCommandLine>
获取有效可用于解析Command
参数的Options
列表,附值给成员变量customCommandLineOptions
;CliFrontend.parseParameters
,匹配Command
第一个参数run
然后调用后序函数;调用CliFrontend.run
:
Command
参数并封装为CommandLine
对象;ProgramOptions
对象,是Job Command
参数的封装类,持有CommandLine
解析得来的参数;ProgramOptions
传入PackagedProgram
的构造函数,创建PackagedProgram
对象,PackagedProgram
负责具体具体调用UserJar
的Main
函数;CliFrontend.executeProgram
为任务执行上下文创建ExecutionEnvironmentFactory
和StreamExecutionEnvironmentFactory
对象,然后调用PackagedProgram.invokeInteractiveModeForExecution
方法反射调用UserJar
的main
函数,执行具体任务逻辑;2. UserJar
的任务入口函数UserMainClass::main(args: Array[String])
被调用后,会依次执行:
StreamExecutionEnvironment::getExecutionEnvironment
创建StreamExecutionEnvironment
和StreamContextEnvironment
对象;StreamExecutionEnvironment.addSource
创建DataStreamSource
对象,DataStreamSource
对象内持有上下文环境中的StreamExecutionEnvironment
和StreamTransformation
对象,StreamTransformation
中持有FlinkKafkaConsumer
对象;DataStream.map
创建OneInputTransformation
对象,其内部持有上游的StreamTransformation
和上线文中的StreamExecutionEnvironment
对象;最后将OneInputTransformation
添加到StreamExecutionEnvironment
的成员变量transformations
列表中;DataStream.addSink
创建DataStreamSink
对象,并将其添加到StreamExecutionEnvironment
的成员变量transformations
列表中;StreamExecutionEnvironment.execute
开始执行Job
创建和任务提交;3. StreamExecutionEnvironment.execute
在Client
端的代码执行流程:
调用StreamExecutionEnvironment.getStreamGraph
,先创建StreamGraphGenerator
对象,然后调用StreamGraphGenerator.generate
生成StreamGraph
,生成StreamGraph
流程如下:
Job
和StreamExecutionEnvironment
的配置以及上下文信息创建StreamGraph
对象;StreamExecutionEnvironment.transformations
,对每个StreamTransformation
进行解析;StreamTransformation
构建出StreamNode
并存放到StreamGraph
对象的成员变量Map<Integer, StreamNode> streamNodes
中,一个StreamNode
包含着一个FlinkOperator
和这个Operator
运行所需的参数/配置信息;StreamGraph.addEdge
,构建每个StreamNode
的Input StreamEdge
和Output StreamEdge
对象,分别添加到StreamNode
的成员变量inEdges
和outEdges
中;StreamEdge
中包含它上下游StreamNode
的Id
值,数据传递规则ShuffleMode
, StreamPartitioner
等信息;StreamExecutionEnvironment.execute(StreamGraph)
执行任务提交流程并等待任务状态返回;StreamExecutionEnvironment.executeAsync
内通过调用DefaultExecutorServiceLoader.getExecutorFactory
检索jar
s 的META-INF.services
目录,加载适合的ExecutorFactory
(Java SPI
),当前Job
可用的是YarnJobClusterExecutorFactory
;YarnJobClusterExecutorFactory
获取YarnJobClusterExecutor
,然后执行YarnJobClusterExecutor.execute
:
PipelineExecutorUtils.getJobGraph
将StreamGraph
转换为JobGraph
,转换的重要逻辑在StreamingJobGraphGenerator.createJobGraph
内,创建JobGraph
的主要操作有:创建一个包有 32 位随机数的JobID
;为Graph
的每个顶点生成一个全局唯一的hash
数(用户可通过DataStream.uid
设置);生成JobVertex
,它是Flink Task
的上层抽象,包含Operators
, invokableClass
, SlotSharing
,OperatorChaining
等信息,存放在JobGraph
的成员变量taskVertices
中;此外还有, ExecutionConfig
, SavepointConfig
, JarPath
s, Classpath
s 等信息;YarnClusterClientFactory.getClusterSpecification
从Configuration
中解析当前提交Job
的JobManager/TaskManager
的内存信息,用于校验Yarn Cluster
是否有足够的资源分配给Job
启动;YarnClusterDescriptor.deployJobCluster
执行具体的Job
提交流程,返回一个ClusterClientJobClientAdapter
对象,其内部通过RestClusterClient
对象与Yarn Cluster
通信,可获取Job
状态或是执行一些其它操作;4. 调用YarnClusterDescriptor.deployJobCluster
执行Job
提交:
调用YarnClusterDescriptor.deployInternal
代码逻辑阻塞直到JobManager
提交Yarn
完成,逻辑如下:
Kerberos
认证(如果有需要);Job
提交条件;yarn queue
;ClusterSpecification
(利用集群Yarn Container vCores/Memory
的配置);YarnClusterDescriptor.startAppMaster
启动AppMaster
(下文详解);startAppMaster
返回的JobManager
相关信息和applicationId
写入Configuration
;调用YarnClusterDescriptor.startAppMaster
:
Configuration
获取FileSystem
配置信息,然后从plugins
目录加载jar
s
初始化FileSystem
实例;Zookeeper Namespace
写入Configuration
对象,可以通过high-availability.cluster-id
配置,默认是applicationId
;YarnApplicationFileUploader
对象,然后将log4j.properties
, logback.xml
, flink-conf.yaml
, yarn-site.xml
, UserJars
, SystemJars
, PluginsJars
, JobGraph序列化
,kerberos配置/认证
等文件上传到hdfs:///user/$username/.flink/$applicationId/
目录下;JobManagerProcessUtils::processSpecFromConfigWithNewOptionToInterpretLegacyHeap
,从Configuration
获取Memory
配置并计算出JobManager
所在进程的Memory
分配数值,最终以-Xmx, -Xms, -XX:MaxMetaspaceSize, -XX:MaxDirectMemorySize
形式用到JVM
进程启动;此外,Memory
计算对旧版配置FLINK_JM_HEAP, jobmanager.heap.size, jobmanager.heap.mb
做了兼容处理;YarnClusterDescriptor.setupApplicationMasterContainer
创建ContainerLaunchContext
(启动Container
所需的信息集合);ApplicationName
, ContainerLaunchContext
, Resource
(向ResourceManager
申请资源), CLASSPATH
, Environment Variables
, ApplicationType
, yarn.application.node-label
, yarn.tags
等信息封装到ApplicationSubmissionContext
;此时,ApplicationSubmissionContext
就封装了ResourceManager
启动ApplicationMaster
所需的所有信息;提交任务失败
的回调函数,用于在提交任务失败后kill Application & delete Files that have been uploaded to HDFS
;YarnClientImpl.submitApplication
将任务提交给Yarn Client Api
处理;YarnClientImpl.getApplicationReport
等待提交任务提交成功(AppMaster
正常启动),最后返回任务状态ApplicationReport
;YarnClientImpl.submitApplication
内通过调用rmClient.submitApplication
向Yarn Client
提交Job
:
YarnClientImpl.rmClient
通过调用ConfiguredRMFailoverProxyProvider.getProxy
获取到,YarnClientImpl.rmClient
实例是ApplicationClientProtocolPBClientImpl
的代理对象,其内部通过ProtoBuf + RpcEngine
提交任务到Yarn Server
端;Yarn Server
端ClientRMService.submitApplication
会收到所有来自Yarn Client
的Job
提交请求,执行后序的AppMaster
启动操作;// Server端在Container中启动JobManager的流程
[1] -> YarnJobClusterEntrypoint::main(String[] args)
-> -> EnvironmentInformation.logEnvironmentInfo
-> -> SignalHandler.register
-> -> JvmShutdownSafeguard.installAsShutdownHook
-> -> YarnEntrypointUtils.logYarnEnvironmentInformation
-> -> YarnEntrypointUtils.loadConfiguration
-> -> ClusterEntrypoint::runClusterEntrypoint
[2] -> -> -> YarnJobClusterEntrypoint.startCluster
-> -> -> -> PluginUtils.createPluginManagerFromRootFolder
-> -> -> -> ClusterEntrypoint.configureFileSystems
-> -> -> -> ClusterEntrypoint.installSecurityContext
-> -> -> -> ClusterEntrypoint.runCluster
-> -> -> -> -> ClusterEntrypoint.initializeServices
-> -> -> -> -> YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory
-> -> -> -> -> DefaultDispatcherResourceManagerComponentFactory.create
-> -> -> -> -> -> JobRestEndpointFactory.createRestEndpoint -> MiniDispatcherRestEndpoint.start
-> -> -> -> -> -> YarnResourceManagerFactory.createResourceManager -> YarnResourceManager.start
-> -> -> -> -> -> ZooKeeperUtils.createLeaderRetrievalService -> ZooKeeperLeaderRetrievalService.start
-> -> -> -> -> -> DefaultDispatcherRunnerFactory.createDispatcherRunner
-> -> -> -> -> -> -> JobDispatcherLeaderProcessFactoryFactory.createFactory
-> -> -> -> -> -> -> DefaultDispatcherRunner::create
-> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::createFor
-> -> -> -> -> -> -> -> -> DispatcherRunnerLeaderElectionLifecycleManager::new -> ZooKeeperLeaderElectionService.start
-> -> -> -> -> -> -> -> -> -> ZooKeeperLeaderElectionService.isLeader
-> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.grantLeadership
-> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess
-> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.start -> AbstractDispatcherLeaderProcess.startInternal
-> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherLeaderProcess.onStart
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherGatewayServiceFactory.create
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> JobDispatcherFactory.createDispatcher (create AkkaServer)
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> MiniDispatcher.start (call AkkaServer to execute job start)
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.onStart
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultDispatcherBootstrap.initialize -> AbstractDispatcherBootstrap.launchRecoveredJobGraphs
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runRecoveredJob
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> Dispatcher.runJob
// 调用链太长,从`Dispatcher.runJob`新建
[3] -> Dispatcher.runJob
-> -> Dispatcher.createJobManagerRunner
-> -> -> DefaultJobManagerRunnerFactory.createJobManagerRunner
-> -> -> -> JobManagerRunnerImpl::new
-> -> -> -> -> DefaultJobMasterServiceFactory.createJobMasterService
-> -> -> -> -> -> JobMaster::new
-> -> Dispatcher.startJobManagerRunner
[4] -> -> -> JobManagerRunnerImpl.start
-> -> -> -> ZooKeeperLeaderElectionService.start -> ZooKeeperLeaderElectionService.isLeader
-> -> -> -> -> JobManagerRunnerImpl.grantLeadership -> JobManagerRunnerImpl.verifyJobSchedulingStatusAndStartJobManager -> JobManagerRunnerImpl.startJobMaster
-> -> -> -> -> -> ZooKeeperRunningJobsRegistry.setJobRunning
-> -> -> -> -> -> JobMaster.start
-> -> -> -> -> -> -> JobMaster.startJobExecution
-> -> -> -> -> -> -> -> JobMaster.setNewFencingToken
-> -> -> -> -> -> -> -> JobMaster.startJobMasterServices
-> -> -> -> -> -> -> -> JobMaster.resetAndStartScheduler
-> -> -> -> -> -> -> -> -> DefaultJobManagerJobMetricGroupFactory.create
-> -> -> -> -> -> -> -> -> JobMaster.createScheduler -> DefaultSchedulerFactory.createInstance -> DefaultScheduler::new
-> -> -> -> -> -> -> -> -> JobMaster.startScheduling
-> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobStatusListener
-> -> -> -> -> -> -> -> -> -> SchedulerBase.startScheduling
-> -> -> -> -> -> -> -> -> -> -> SchedulerBase.registerJobMetrics
-> -> -> -> -> -> -> -> -> -> -> SchedulerBase.startAllOperatorCoordinators
-> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.startSchedulingInternal
-> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.prepareExecutionGraphForNgScheduling
-> -> -> -> -> -> -> -> -> -> -> -> EagerSchedulingStrategy.startScheduling -> EagerSchedulingStrategy.allocateSlotsAndDeploy // stream job use EagerSchedulingStrategy as default Scheduler
-> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlotsAndDeploy // explain ExecutionVertexID
[5] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.allocateSlots -> DefaultExecutionSlotAllocator.allocateSlotsFor -> DefaultExecutionSlotAllocator.allocateSlot -> NormalSlotProviderStrategy.allocateSlot
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SchedulerImpl.allocateSlot -> SchedulerImpl.allocateSlotInternal -> SchedulerImpl.internalAllocateSlot -> SchedulerImpl.allocateSingleSlot -> SchedulerImpl.requestNewAllocatedSlot
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> SlotPoolImpl.requestNewAllocatedBatchSlot -> SlotPoolImpl.requestNewAllocatedSlotInternal -> SlotPoolImpl.requestSlotFromResourceManager
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceManager.requestSlot -> SlotManagerImpl.registerSlotRequest -> SlotManagerImpl.internalRequestSlot -> SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot -> SlotManagerImpl.allocateResource
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> ResourceActionsImpl.allocateResource -> YarnResourceManager.startNewWorker -> YarnResourceManager.requestYarnContainer -> AMRMClientAsyncImpl.addContainerRequest
[6] -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultScheduler.waitForAllSlotsAndDeploy -> DefaultScheduler.deployAll -> DefaultScheduler.deployOrHandleError -> DefaultScheduler.deployTaskSafe
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> DefaultExecutionVertexOperations.deploy -> ExecutionVertex.deploy -> Execution.deploy
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskDeploymentDescriptorFactory.createDeploymentDescriptor
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> RpcTaskManagerGateway.submitTask
-> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> -> TaskExecutor.submitTask [invoke TaskExecutor via RPC which created by calling JobMaster.registerTaskManager]
1. 在Per-job
模式下,AppMaster
进程的启动入口是YarnJobClusterEntrypoint.main
:
EnvironmentInformation::logEnvironmentInfo
打印环境变量信息;SignalHandler::register
注册TERM, HUP, INT
等终止/退出信号处理(例如:kill
);System::getEnv
获取变量PWD
,该目录为进程的启动目录,目录下包含进程启动所需的jar, config, job.graph, launch_container.sh
等文件(通过ln
创建的链接文件),PWD
目录位置:${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache/${applicationId}/${containerId}
;YarnEntrypointUtils::logYarnEnvironmentInformation
打印Yarn
相关信息;YarnEntrypointUtils.loadConfiguration
从flink-conf.yaml
和System.env
构建Configuration
对象;YarnJobClusterEntrypoint
对象,并为当前进程添加shutDownHook
(在进程退出前进行删除本地文件的操作);ClusterEntrypoint::runClusterEntrypoint
,函数内部通过调用YarnJobClusterEntrypoint.startCluster
做AppMaster
启动操作,然后为YarnJobClusterEntrypoint
对象注册了监控Termination
的回调函数,用于打印进程结束的exit code
等信息;2. 调用YarnJobClusterEntrypoint.startCluster
,依次执行:
PluginUtils::createPluginManagerFromRootFolder
,将plugin
的name, jars
填充到PluginDescriptor
,然后将PluginDescriptor
和委托给parent of plugin.classLoader
加载的包名列表
封装到DefaultPluginManager
;ClusterEntrypoint.configureFileSystems
内部通过Java SPI
去加载并初始化所有jars
(包括common/plugin jars
)的META-INF/services/
目录下的FileSystemFactory
服务;ClusterEntrypoint.installSecurityContext
,创建ZooKeeperModule, HadoopModule, JaasModule
和HadoopSecurityContext
对象并存放在SecurityUtils
的成员变量,然后通过创建的HadoopSecurityContext
对象触发执行ClusterEntrypoint.runCluster
;从代码阅读来看:installSecurityContext
的目的在于向运行环境添加必要的用户权限
和环境变量
配置;ClusterEntrypoint.runCluster
执行流程:
ClusterEntrypoint.initializeServices
初始化多个服务模块;
commonRpcService
:AkkaRpc
服务,处理本地和远程的服务调用请求;haServices
:ZooKeeperHaServices
服务,处理与zookeeper
的交互请求,如JobManager/ResourceManager
等组件的leader
选举;blobServer
:处理Job BLOB文件
的/上传/下载/清除等操作,BLOB
包括Jar
s,RPC消息
等内容;heartbeatServices
:管理Job
各组件的心跳服务;metricRegistry
:监控指标注册,持有Job
各监控指标和MetricReporter
s 信息;processMetricGroup
:系统/进程监控指标组,包含CPU
, Memory
, GC
, ClassLoader
, Network
, Swap
等监控指标;archivedExecutionGraphStore
:用于保存序列化的ExecutionGraph
?YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory
创建工厂对象DefaultDispatcherResourceManagerComponentFactory
,工厂对象用于创建DispatcherResourceManagerComponent
对象,该对象的创建过程是Job
启动的核心逻辑所在;另外提一下,FileJobGraphRetriever
将本地文件job.graph
反序列化为JobGraph
;DefaultDispatcherResourceManagerComponentFactory.create
执行流程:
Flink-Web-Dashboard
的Rest
接口服务WebMonitorEndpoint
;YarnResourceManager
以及其内部服务SlotManager
,负责Job
的资源管理,如:申请/释放/记录;MiniDispatcher
,在Dispatcher
启动过程中会创建JobManager
线程完成任务的启动;ResourceManager
, Dispatcher
的监听协调服务,用于服务的容错恢复,通过在Zookeeper
中保持了一个锁文件来协调服务;WebMonitorEndpoint, YarnResourceManager, MiniDispatcher
三个服务在ZooKeeper
下保存着各自的相关信息,通过ZooKeeperHaServices
保证服务的高可用;三个服务的启动过程是通过AkkaRPC + 状态机
的设计模式实现的,在各服务对象创建过程中注册生成AkkaServer
,在服务启动过程中通过AkkaRPC
调用不同状态机函数
,最后回调onStart
执行实际的启动逻辑,完整的启动逻辑较为复杂,需耐心翻阅;状态机默认状态是:StoppedState.STOPPED
,AkkaServer
创建逻辑在AkkaRpcService.startServer
内,另外,在AkkaRpcService.startServer
内通过调用AkkaRpcService.registerAkkaRpcActor
创建AkkaRpcActor
,AkkaRpcActor
为具体接收消息并执行状态机
逻辑的入口,接收消息的处理逻辑在AbstractActor.createReceive
内;3. 调用Dispatcher.runJob
进入JobManager
启动流程:
Dispatcher.createJobManagerRunner
创建JobManagerRunnerImpl
, JobMaster
对象,JobMaster
代表了一个运行的JobGraph
,在JobMaster::new
过程中创建了RpcServer
, DefaultScheduler
, SchedulerImpl
, SlotPoolImpl
, ExecutionGraph
, CheckpointCoordinator
等重要对象,创建ExecutionGraph
入口:SchedulerBase.createExecutionGraph
;Dispatcher.startJobManagerRunner
执行整个任务的提交流程:启动JobManager
端各服务模块,申请Slot
资源启动TaskManager
,提交/执行Task
等流程;4. 从调用JobManagerRunnerImpl.start
开始到DefaultScheduler.allocateSlotsAndDeploy
结束
Job
状态的变更和监听服务的添加,为JobManager
添加监控指标;代码较简单,在此不一一说明;5. 调用DefaultScheduler.allocateSlots
进入资源申请流程:
// TODO
find slot from existing resource, if don‘t find request resourceManager for a new resource
AMRMClientAsyncImpl.CallbackHandlerThread // get Resource that have been allocated and call YarnResourceManager.onContainersAllocated
to process TaskManager
AMRMClientAsyncImpl.HeartbeatThread // invoke AMRMClientImpl.allocate
periodically, ResourceManager will be called to allocate Resource
YarnResourceManager.requestYarnContainer // put request of allocate Container to member variable AMRMClientImpl.ask
6. 调用DefaultScheduler.waitForAllSlotsAndDeploy
进入Task
发布流程
// TODO
获取到的 container 资源存放在纳贡,以供给后序 slot 分配; start TaskManager and register it to JobManager
// TODO
公司领导在 19 年底确定将所有服务迁移到某云服务,我所在团队负责的两条产品线需在六月前完成迁移;这两条产品线承载着公司 80%数据收集和报表计算&查询服务,指标较多且系统复杂,迁移压力略大;从 20 年初开始制定迁移方案,进行实际的迁移操作,整个迁移在团队 4 个人的全职投入和在 DBA、基础组件同事的协助下最终在 5 月中旬完成迁移。本文目的在于简单记录下自己在迁移过程中所遇到的一些技术问题。
两条产品线主要功能有数据收集,实时数据处理,离线数据处理,指标查询四类,涉及到的需迁移的技术组件如下:
Collector(自研,Go 实现)
;Kafka
, Flink
, Etl-formwork(自研 Pipeline)
;Crontab + Hive + Python2
;Druid
, Hadoop
, Cassandra
, Mongo
, ES
, Mysql
, Redis
;Zookeeper
,Jetty
, Dubbo
, Grafana
, Prometheus
;Nginx
, VIP
等;在整个迁移过程中自己主要负责:实时/离线计算任务的迁移,实时/历史数据同步,指标核对工具的开发,保证迁移后各指标的准确无误;需迁移技术组件包括:Kafka
, Flink
, Zookeeper
, Dubbo
, Crontab + Hive + Python
;
对Dubbo
进行升级操作的原因:新环境中的调用Dubbo
服务耗时较长,导致实时数据处理出现淤积的情况,在 Github
找到一个类似的issue,随后对Dubbo
进行了升级操作,问题得以解决;
Dubbo 2.7.6
基本可以向后兼容性2.5.3
,只需升级Dubbo
依赖版本,添加Curator
依赖,然后将用户代码引入的 Dubbo
类的路径变更为com.apache
即可,如:com.alibaba.dubbo.config.ApplicationConfig
变更为org.apache.dubbo.config.ReferenceConfig
;
服务升级后发现在运行日志中有如下报错:ERROR org.apache.dubbo.qos.server.Server - [DUBBO] qos-server can not bind localhost:22222
,直接在Dubbo
配置中关闭QOS
服务即可;
您没看错,确实是Hive 0.13
,且Hive SQL
提交到一个Hadoop 0.20
的集群上~#.#~;这些批脚本对应的是一条产品线的离线指标计算的业务,这些脚本包含:Shell
,Python
,Hive SQL
等脚本,并依赖了一个Mongo
库,一个Jetty
服务,三个Mysql
库,八个User Jar
,几十个UDF
;本次迁移必须对Hadoop/Hive
做升级,且将脚本的业务逻辑梳理清楚;这些批脚本的迁移大概用了 1 个月的时间;
从0.13
升级到1.2.1
过程十分顺利,在自己定义的 HiveCli
初始化脚本中添加一行配置(set hive.support.sql11.reserved.keywords=false;
)即可;主要是因为Hive
从1.2.0
版本开始,根据SQL2011
标准增加了大量reserved keyword
,通过这个配置可以保证包含reserved keyword
的 SQL 可正常执行;参考:
最初迁移时是计划合并多个Hadoop
集群到公司公用的Hadoop 3.1.0
集群,对应的 Hive 版本也升级到3.1.1
,但最终因牵动项目多(Druid
, Etl-Framework
),升级风险高而放弃;在Hive
的0.13
版本升级到3.1.1
的测试阶段也解决了多个 SQL 迁移问题,在此做下记录。
hive.support.sql11.reserved.keywords 配置不可用
在Hive 2.3
之后配置set hive.support.sql11.reserved.keywords=false;
已经被移除,所以在 SQL 中有用到reserved keyword
做标识符时需要放在反引号(``)内,以消除歧义;社区 issue: https://issues.apache.org/jira/browse/HIVE-14872。
Union 两张字段类型不一致的表
Hive 3.1.1
环境下当union all
两个字段类型不一致的列时,会收到如下报错信息:FAILED: SemanticException 17:71 Schema of both sides of union should match: Column user_offset is of type string on first table and type int on second table. Error encountered near token 'charge_device'
;看起来是Hive3
对union all
两侧的数据集进行了类型一致性校验;对类型不一致的列进行CAST(user_offset as int)
修改后 SQL 可正常执行;
hive.strict.checks.type.safety
严格类型安全检查,默认是 true,该属性不允许 bigint 和 string 间的比较
,bigint 和 double 间的比较
;将属性设置为 false,可以解除不允许上述两种不同类型间的比较的限制,在 SQL 的Where
条件中经常会出现这种类型不一致的条件比较。
SQL 执行耗时增加:
Hive 3.1.1
环境下发现个 SQL 执行特别慢,而同样 SQL 在 Hive0.13 都可以很快执行完成;经过进一步测试发现在Hive3
下,查询同一张表的不同字段(如下文提供的 SQL),性能差别很大,在单表数据量40W+
的测试条件下,查询某个字段要用 2 个多小时,而查询其他字段只需3~5分钟
;同样数据集同样的 SQL 以相同并行度在Hive 0.13
下执行,查询时长都可保持在3~5分钟
;在修改表数据存储格式(STORED AS PARQUET
)后查询慢的问题可以解决,通过hive --debug
进行远程Debug
发现Hive3
对数据反序列化阶段变代码变化较大,但没找到问题根源,先留下个问题和数据,以后再结合JMX
, Arthas
这些工具看个究竟吧,示例:
-- 表结构:
CREATE TABLE `tmp_logout`(
`user` map<string,string> COMMENT 'user_info',
`device` map<string,string> COMMENT 'device_info',
`app` map<string,string> COMMENT 'app_info',
`event` struct<eventType:string,attribute:map<string,string>,eventDatas:array<struct<key:string,value:string,type:string>>> COMMENT 'event_info')
PARTITIONED BY (`job_time` bigint, `timezone` int)
STORED AS TEXTFILE;
-- sql_1:执行速度快
insert overwrite directory '/user/hadoop/output/tmp_logout'
select app['product_id'] as pid from tmp_logout where job_time=20200420120000 and app['product_id'] is not NULL;
--sql_2:刚开始执行较快,但随时间增加执行越来越慢,最终执行完需要2H+,GC时间逐渐增加
insert overwrite directory '/user/hadoop/output/logout'
select event.attribute['event_time'] as event_time from tmp_logout where job_time=20200420120000 and app['product_id'] is not NULL;
MR
切换到Spark
上时,也发现了很多 SQL 执行报错的问题,比如:Hive UDAF
执行报错;在 Hive 配置set hive.groupby.skewindata=true;
的情况下,有些group by
的 SQL 的执行报错(经Calcite
优化后的执行计划不一样),等等;总之,需调整的的地方还是挺多的。用Flink
写了个Kafka To Kafka
数据拷贝工具,由于迁移前期两个机房之间的网络不太稳定,为防止数据拷贝任务频繁重启,给Flink Job
添加了 checkpoint 失败不重启的配置:env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
,这个配置导致了数据重复消费的问题;
现象: Checkpoint
失败持续了 6 个小时之久且没一次成功;查看日志发现每次 checkpoint 时候都会有如下报错信息:java.lang.IllegalStateException: Correlation id for response (204658) does not match request (204657)
;FlinkKafkaConsumer09
的 Offset 提交模式是OffsetCommitMode.ON_CHECKPOINTS
;查看Kafka-Manager
的Consumer Offset
监控,发现一直没变化;重启 Job 发现 6 小时内的数据出现重复消费的问题。
解决: 在社区看到一个相同的问题:https://issues.apache.org/jira/browse/KAFKA-4669;后来关闭了测试数据拷贝并将流量较大的几个Topic
切到kafka-mirror-maker.sh
上,问题得以暂时解决;当前 BUG 并未根除,当前问题也先遗留下来后序再看看源码吧。
在升级 JDK 版本后,业务代码调用 ES 接口时报错,相关信息如下:
// 调用 ES API 的用户代码
String value = String.valueOf(hits[0].field(collFields[i]).getValue());
// SearchHitField.java 被调用的方法
<V> V getValue();
// 报错信息:java.lang.ClassCastException: java.lang.Object cannot be cast to [C
// 用户代码反编译结果:
// String value = String.valueOf((char[]) hits[0].field(collFields[i]).getValue());
// 正确写法
Object obj = hits[0].field(collFields[i]).getValue();
String value = String.valueOf(obj);
从上述信息我们可以判断,当前这个报错是因为:在编过程中,用户代码上下文没有足够的信息给到编译器进行泛型类型的推断;贴一个文章作为学习参考:http://lovestblog.cn/blog/2016/04/03/type-inference/?from=groupmessage;
公司机房有高 IO 需求的服务如:Druid Historical, Kafka, Cassandra, ES, Mysql, etc.
都是用的12块HDD做Raid5
;而到云服务是单独的磁盘阵列,能加多块盘,但没办法做Raid5
,单块磁盘读写上限限制到80M/s
,此外:听说磁盘和主机是分别存放并通过光纤链接到一起的;这使得前期压测花费了不少时间在磁盘性能测试上,在整个迁移过程中也做了几次的磁盘参数调整,或是更换高性能 SSD 的工作;
Mysql
, Redis
, Cassandra
这些服务是通过建立跨机房集群来完成实时数据迁移的,对跨机房的网络稳定性要求较高,期间由于大量历史数据跨机房拷贝导致了网络堵塞,这影响到了这些跨机房服务的正常运行;
Druid
, Hadoop
, ES
等服务是通过在云上新装集群,然后通过distcp
, elasticdump
命令将历史数据跨机房拷贝到云服务上的;
刚开始时,机房间的通信用是一根 10G 光纤,所有团队服务都走这一根光纤,互相会有影响,后又拉了根 5G 光纤;
Kafka
集群在负载不高的情况下,上游数据写入出现淤积的情况,运维同事帮忙调整了下网卡参数(ethtool -L eth0 combined 2
, ethtool -l eth0
)并重启机器后,淤积情况有所缓解;
Kafka 0.8.1
版本问题较多,迁移到了Kafka 0.9
集群,Kafka0.8.1
遇到的问题有:recovery threads
只有一个线程且不能配置;删除Topic
会导致Broker
的报错日志激增,必须手动清楚ZK
上的Delete Topic
信息,并重启所有Broker
;auto.leader.rebalance.enable=false
是默认配置,会导致Broker
压力不均衡,需修改为true
;
在Flink的日常使用过程中总会遇到一些值得记录的问题,有些问题复杂度不高,不必要用单独的篇幅记录,就将这些问题都汇总到当前这篇文内。
环境: Flink 1.7.1
+ Hadoop 3.1.0(新IDC机房)
+ Kafka 0.9(云机房)
背景: 公司决定将现有机房迁移到某云+新的IDC机房
;Hadoop 3.1.0
公共大集群,Kafka 0.9
部署在团队自己的云主机
上;
Job描述: 实时ETL,Flink读取Kafka数据经过处理后写入ES或Kafka等存储,State Operator为FlinkKafkaConsumer
,checkpoint state为kafka consumer offset
,每个state大小在7~10K
之间;
异常信息:
2020-01-02 17:18:53,337 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: analytics_standarddata_weapp_share-druid -> XXXXXXXXXX (1/1) (c2e77749126a67db62d38bdb166b5696) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 60 for operator Source: analytics_standarddata_weapp_share-druid -> XXXXXXXXXX (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 60 for operator Source: analytics_standarddata_weapp_share-druid -> XXXXXXXXXX (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/saas/app/flink/checkpoints/1a3bc71ac57f33961637adf340e9c28f/chk-60/253fc668-9241-41c7-994d-3e72aa7281ef in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/saas/app/flink/checkpoints/1a3bc71ac57f33961637adf340e9c28f/chk-60/253fc668-9241-41c7-994d-3e72aa7281ef in order to obtain the stream state handle
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
......
异常说明:
每隔10~30min
就会出现一次state checkpoint failed.
,进而导致Job重启(checkpoint默认策略,可配置),Job Log
中发现上述错误信息;
异常排查:
icmp_seq=105 ttl=58 time=2.25 ms 11:09:05
),没有网络波动出现;consumed offset
和write state to HDFS
;FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(...)
,阅读源码,了解具体操作;FsCheckpointStateOutputStream.closeAndGetHandle(...)
操作很简单,就是将state从byte[]写入到HDFS File
,中间没有多余的操作;FsCheckpointStateOutputStream.closeAndGetHandle(...)
到抛出异常的总耗时,用来定位是否存在写HDFS慢的情况,进而导致整个checkpoint failed
;system timestamp
,在异常信息上添加这些打点的timestamp信息;cd flink-runtime/
,mvn clean compile
,得到编译好的FsCheckpointStreamFactory$FsCheckpointStateOutputStream.class
文件;FsCheckpointStreamFactory$FsCheckpointStateOutputStream.class
,操作指南:
# step 1:将flink-dist_2.11-1.7.1.jar中的文件解压出来:
> jar -xvf flink-dist_2.11-1.7.1.jar org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory\$FsCheckpointStateOutputStream.class
# step 2:用新的.class覆盖刚解压出来的
> cp -f <new_class_file> org/apache/flink/runtime/state/filesystem/
# step 3:将新的.class文件加入到flink-dist_2.11-1.7.1.jar
> jar -uvf flink-dist_2.11-1.7.1.jar org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory\$FsCheckpointStateOutputStream.class
Could not flush and close the file system output stream to hdfs:/saas/app/flink/checkpoints/cbf5c27f63a7a4724538f3fd9f2ef551/chk-363/01fc22d3-f24b-40c1-9728-cc50dc2e14d2 in order to obtain the stream state handle, enter timestamp:1578025396026, failed timestamp:1578025433855, duration: 37829
;方法总执行时间为37_829ms
,远超过checkpoint timeout设置10_000ms
;到此,可以初步判定checkpoint异常和写HDFS耗时过长有关,但并不清楚每隔10~30min
就出现一次写HDFS慢的原因;Directory Scan
耗时太长造成的,HDFS Log:
DirectoryScanner
属于一个Hadoop版本缺陷,具体可查看jira:https://issues.apache.org/jira/browse/HDFS-14476dfs.datanode.directoryscan.interval
,修改job checkpoint相关配置(setCheckpointTimeout/setFailOnCheckpointingErrors/etc.),然而这些都不治本,最好还是Hadoop打补丁,但又没那么快;2E
个文件,略微有点多惹~~~本文主要是记录一个非常简单的Flink Job
在从Standalone迁移OnYarn时所遇到的一个因内存占用超出限制而引发的Container频繁被Yarn Kill的问题。问题的解决过程主要经历了:Flink监控指标分析,GC日志的排查,TaskManger内存分析,Container的内存计算方法,栈内存的分析等内容;
起因: 希望将Flink Standalone
上一个简单的Job迁移到Flink On Yarn
,迁移前的版本为Flink 1.3.2
,迁移目标版本为Flink 1.7.1 + Hadoop 3.1.0
;
Job描述:
Kafka Topic
,每个Topic的Partition数为21;ES Index
,每个Index对应一个Topic;flink run -m yarn-cluster -yn 11 -ys 2 -ytm 6g -ynm original-data-filter-prod -d -yq XXX.jar 21
graph LR
A(Source: KafkaSource) --> B(Filter: Operator)
B --> C(Sink: Es Index)
source/filter/sink
的并行度一致,以保证Operator Chain
10 Topic * 21 parallelism = 210
,在考虑到Slot Sharing
情况下每个Container内运行的Task数为:10个20,1个10
;异常描述:
Job正常启动后TaskManager所在的Container每3~5min会被Yarn Kill掉,然后ApplicationMaster会重新向Yarn申请一个新的Container以启动之前被Kill掉的,整个Job会陷入Kill Container
/Apply For New Container
/Run Task/ Kill Container...
的循环,在Flink和Yarn的日志里都会发现如下错误信息:
2019-11-25 20:12:41,138 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e03_1559725928417_0577_01_001065 because: [2019-11-25 20:12:36.159]Container [pid=97191,containerID=container_e03_1559725928417_0577_01_001065] is running 168853504B beyond the 'PHYSICAL' memory limit. Current usage: 6.2 GB of 6 GB physical memory used; 9.8 GB of 60 GB virtual memory used. Killing container.
Dump of the process-tree for container_e03_1559725928417_0577_01_001065 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 97215 97191 97191 97191 (java) 87864 3747 10359721984 1613717 /usr/java/default/bin/java -Xms4425m -Xmx4425m -XX:MaxDirectMemorySize=1719m -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+ParallelRefProcEnabled -XX:ErrorFile=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/hs_err_pid%p.log -Xloggc:/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/gc.log -XX:HeapDumpPath=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -Dlog.file=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
|- 97191 97189 97191 97191 (bash) 0 0 118067200 371 /bin/bash -c /usr/java/default/bin/java -Xms4425m -Xmx4425m -XX:MaxDirectMemorySize=1719m -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+ParallelRefProcEnabled -XX:ErrorFile=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/hs_err_pid%p.log -Xloggc:/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/gc.log -XX:HeapDumpPath=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -Dlog.file=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/taskmanager.out 2> /home/hadoop/apache/hadoop/latest/logs/userlogs/application_1559725928417_0577/container_e03_1559725928417_0577_01_001065/taskmanager.err
[2019-11-25 20:12:36.175]Container killed on request. Exit code is 143
[2019-11-25 20:12:38.293]Container exited with a non-zero exit code 143.
看到上述异常信息,第一直觉是heap或off-heap的Memory占用过大,应该先分析下Container的内存占用情况。
信息:
-Xms4425m -Xmx4425m -XX:MaxDirectMemorySize=1719m
;# defined configuration
taskmanager.heap.size: 2048m # 默认为2G,当前Job中被覆盖为6G,指定Container Memory(heap+off-heap);
taskmanager.memory.preallocate: true # Whether pre-allocated memory of taskManager managed
containerized.heap-cutoff-min: 900 # 安全边界,从Container移除的最小 Heap Memory Size
containerized.heap-cutoff-ratio: 0.2 # 移除的Heap Memory的比例,用于计算Container进程的heap和max off-heap的大小,max off-heap越大留给Flink之外服务的Memory越多,这些空间一般被用作Container,Socket通信或是一些堆外Cache等;
Flink Web Dashboard
中heap/off-heap的监控如下图:
[Eden: 642.0M(642.0M)->0.0B(676.0M) Survivors: 32.0M->26.0M Heap: 3893.9M(4426.0M)->3245.6M(4426.0M)]
分析:
通过Flink Log
和Web Dashborad
查看到上述四项有关Container启动/运行时的信息,发现heap和off-heap都在合理的范围(有足够的空闲Memory),但heap占用的Memory一直在3.8G
左右,感觉上这个值属正常,但仍有侥幸心理:先降低heap占用试下。这时做了一次Flink配置修改:taskmanager.memory.preallocate: false
,这个配置主要是针对TaskManager Managed Memory的,与之相关的还有taskmanager.memory.off-heap
, taskmanager.memory.fraction
等,详细说明见:Flink Configuration Doc。
修改配置后重启Job,正常运行了几分钟后问题又再次出现,Container被Kill前最后一次GC Log:[Eden: 192.0M(192.0M)->0.0B(194.0M) Survivors: 28.0M->26.0M Heap: 2629.3M(4426.0M)->250.2M(4426.0M)]
,从Web Dashboard
中观察到的最大heap used
是3.7G
,off-heap used
是1.3G
。
思考:
heap/off-heap memory
在任意时间的实际使用量都在-Xmx4425m -XX:MaxDirectMemorySize=1719m
范围内(而且有足够的空闲空间)变化,同时Xmx + MaxDirectMemorySize = 6G
,所以任何时间的Memory使用:heap + off-heap < 6G
;另一方面:若heap + off-heap > 6G
,则应该抛出OOM的异常,所以判定当前问题与Heap/Off-heap的实际占用没有直接关系。接下来有两个疑问:1. Container Memory的计算逻辑是什么,使得计算结果出现大于6G的情况?2. 一个JVM进程包括:Heap, Off-heap, Non-heap, Stack等区域,前三个在Web Dashboard
可以看到并且内存总和不大于6G,而Stack内存占用并不能直接观察到,当前问题是否由Stack占用的Memory过大而引起?
现在我们不但可以确定当前问题与Heap/Off-heap没有直接的因果关系,而且下一步的分析方向可以明确了。
1. Container Memory的计算逻辑:
Physical Memory
和Virtual Memory
是否超出限制,默认检查周期:3s,代码逻辑可查看:MonitoringThread。Container Memory
的内存量计算有ProcfsBasedProcessTree
和CombinedResourceCalculator
两种实现,默认情况下使用ProcfsBasedProcessTree
。ProcfsBasedProcessTree
通过Page数量 * Page大小
来计算Physical Memory
的使用量,其中Page数量从文件/proc/<pid>/stat
解析获得,单Page大小通知执行shell命令getconfg PAGESIZE
获取,代码截图如下:
2. Stack Memory的分配区域:
由于JVM功底并不深,初听Stack Memory
具体在哪个区域分配,还真不能准确回答~~。JVM规范定义:Each Java Virtual Machine thread has a private Java Virtual Machine stack, created at the same time as the thread. A Java Virtual Machine stack stores frames (§2.6). A Java Virtual Machine stack is analogous to the stack of a conventional language such as C: it holds local variables and partial results, and plays a part in method invocation and return. Because the Java Virtual Machine stack is never manipulated directly except to push and pop frames, frames may be heap allocated. The memory for a Java Virtual Machine stack does not need to be contiguous.
。这段定义并没对Stack Memory的分配做明确定义,只有两句线索:frames may be heap allocated
和stack does not need to be contiguous
,各JVM产品可对Stack Memory
的分配做各自的灵活实现。对自己来说C++暂时是座难翻越的山,也不必要那么兴师动众~~~,直接上测试Code看吧:
/**
* jvm env: Oracle JDK-8
* jvm conf:
* -Xmx200m -Xms200m -Xss5m -XX:MaxDirectMemorySize=10m -XX:NativeMemoryTracking=detail
*/
public static void main(String[] args) {
List<Thread> list = new ArrayList<>(10000);
int num = 500;
while (num-- > 0) {
Thread thread = new Thread(() -> method(0));
thread.start();
list.add(thread);
}
list.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
private static void method(long i) {
int loop = 1024 * 16;
if (i < loop) {
method(++i);
} else {
try {
System.out.println(i);
Thread.sleep(60 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
查看测试DEMO的内存占用,执行Command:jcmd <pid> VM.native_memory scale=MB
结果如下:
Native Memory Tracking:
Total: reserved=4028MB, committed=2766MB
- Java Heap (reserved=100MB, committed=100MB)
(mmap: reserved=100MB, committed=100MB)
- Class (reserved=1032MB, committed=5MB)
(classes #860)
( instance classes #748, array classes #112)
(mmap: reserved=1032MB, committed=5MB)
( Metadata: )
( reserved=8MB, committed=4MB)
( used=1MB)
( free=4MB)
( waste=0MB =0.00%)
( Class space:)
( reserved=1024MB, committed=1MB)
( used=0MB)
( free=0MB)
( waste=0MB =0.00%)
- Thread (reserved=2575MB, committed=2575MB)
(thread #529)
(stack: reserved=2573MB, committed=2573MB)
(malloc=2MB #2663)
(arena=1MB #1057)
- Code (reserved=242MB, committed=7MB)
(mmap: reserved=242MB, committed=7MB)
- GC (reserved=56MB, committed=56MB)
(malloc=21MB #1143)
(mmap: reserved=36MB, committed=36MB)
- Internal (reserved=3MB, committed=3MB)
(malloc=3MB #15173)
- Symbol (reserved=1MB, committed=1MB)
(malloc=1MB #1352)
- Native Memory Tracking (reserved=1MB, committed=1MB)
- Shared class space (reserved=17MB, committed=17MB)
(mmap: reserved=17MB, committed=17MB)
结论: 从Native Memory Tracking
可以看到Thread Stack Memory
占用了2573MB
,而heap/off-heap
的Memory占用非常低;所以,Stack Memory
的分配是在heap/off-heap
之外的区域(虽然暂时不完全了解Stack Memory的分配细节)。此时,我们可以合理怀疑当前问题是由Stack Memory
占用过多引起的。
补充一张测试DEMO的Memory监控(非必须),可以简单的关注下Heap/GC情况:
Container的实际线程情况:
从VisualVM的线程监控中可以看到一个Container进程内有1734个Live Thread
和1622个Daemon Thread
,对于一个只有2C6G
的进程来讲,线程数太多,不仅会影响到Memory使用,也会给GC/线程上下文切换带来更大的压力;在所有活跃线程里有1200+
个是es transport client
,占总活跃线程的70%+
,如下图:
到此,可得出解题方案,即: 减少Container内的ES Sink
实例个数以达到降低es transport client
线程数量的目的。将ES Sink
的并行度由21调整为2(写数据到ES速度大概为1200+条/s,2个并行度可以满足需求),重启Job后运行正常,当前问题得到完美解决,调整后的执行计划如下图:
内存组成: 除去上述提到的内存使用外还有socket send/receive buffer也占用了一部分空间。
快速失败: 当前Yarn或Flink中没有可用的配置来指定Kill Container
或allocate Container
的最大次数,以达到在超过某个限制时候,让Job快速失败的目的;Flink中有yarn.maximum-failed-containers
配置,当并不适用当前场景;还是需要自己写脚本来完成类似的功能;贴一个反复申请Container的截图:
胡思乱想: Flink的Task之间不能共享Operator,Operator也不会有单例的实现,唯一存在的是并行度和Task分配运行策略;若当前问题中一个Container进程内多个Task之间可用共享一个单例的ES Sink
多好~~~,这样就可以少一层物理机间的数据传递。
注意小心: Slot Sharing是个很好的特性,可以很大程度提高资源利用率,但也需小心,不要让单个Slot内的Task数量过多。
另一种选择: 其实,我们可以通过增大containerized.heap-cutoff-ratio
到0.4
或是更高来解决当前问题(预留足够多的Memory给到Stack/Container/Socket
使用),但这样势必会减少Heap的大小,考虑到Task内不断增多的Local Cache
,所以并未做此调整;另一方面,通过JMX观测到GC的CPU占用超过20%且抖动厉害,只增大MaxDirectMemorySize
是解决不了这个GC问题的,而通过减少线程数可以很好减少GCRoot,降低线程切换频率以降低GC压力;
起初对Job的设计思考中是想尽量减少资源占用,最极端的办法就是将数据读取/处理/写出的整个流程放在同一个JVM进程中,这样Operator间的数据传递就可以在进程内部完成而减少网络间的数据传递;由此想到Chain All Operators Together
,这样就必须把source/filter/sink
的并行度设置为相同的值;然而,这就让每个Task都持有一个Sink实例,同时因为Slot Sharing
特性的存在,每个Container内部会运行多个Task,这就导致单个Container中存在多个Sink实例,特别是Es Sink
这种较为重的Operator(内部维护的线程/缓存/状态较多);Es Sink
实例对象的增加导致了线程数的成倍增加,所有线程持有的Stack Memory
总和也成倍增加,用于运行Job Task
的Memory数量进一步减少,另一方面:每个Container所持有的2个CPU资源也很难支撑太多数量的线程高效运行。
所以,在了解到上述内容后,首先需要做的事情是减少Container内的线程数(降低Stack Memory
占用,减少频繁的线程上下文切换),减少线程办法就是减少ES Sink
的数量(source/filter
线程少可忽略不计),从得出了当前的解决方案;
最后,将Es Sink
并行度调整为2后问题得到彻底解决,至今Job已平稳运行了半个多月;
注: JVM很重要,清晰定位问题很重要;在最初的问题排查过程中,自己总试图从heap/off-heap
占用寻找答案,甚至还将heap/off-heap
占用的Memory进行拆分加和以计算是否存在使用过量的情况,耗费了不少精力。