近期公司内部的Flink Job从Standalone迁移OnYarn时发现Job性能较之前有所降低,迁移前有8.3W+/S的数据消费速度,迁移到Yarn后分配同样的资源但消费速度降为7.8W+/S,且较之前的消费速度有轻微的抖动;经过简单的原因分析和测试验证得以最终解决,方案概述:在保持分配给Job的资源不变的情况下将总Container数量减半,每个Container持有的资源从1C2G 1Slot
变更为2C4G 2Slot
。本文写作的起因是经历该问题后发现深入理解Slot和Flink Runtime Graph是十分必要的;本文主要分为两个部分,第一部分详细的分析Flink Slot与Job运行关系,第二部详细的说下遇到的问题和解决方案。
Flink集群是由JobManager(JM), TaskManager(TM)两大组件组成的,每个JM/TM都是运行在一个独立的JVM进程中,JM相当于Master 是集群的管理节点,TM相当于Worker 是集群的工作节点,每个TM最少持有1个Slot,Slot是Flink执行Job时的最小资源分配单位,在Slot中运行着具体的Task任务。对TM而言:它占用着一定数量的CPU和Memory资源,具体可通过taskmanager.numberOfTaskSlots
, taskmanager.heap.size
来配置,实际上taskmanager.numberOfTaskSlots
只是指定TM的Slot数量并不能隔离指定数量的CPU给TM使用,在不考虑Slot Sharing(下文详述)的情况下一个Slot内运行着一个SubTask(Task实现Runable,SubTask是一个执行Task的具体实例),所以官方建议taskmanager.numberOfTaskSlots
配置的Slot数量和CPU相等或成比例;当然,我们可以借助Yarn等调度系统,用Flink On Yarn的模式来为Yarn Container分配指定数量的CPU资源以达到较严格的CPU隔离(Yarn采用Cgroup做基于时间片的资源调度,每个Container内运行着一个JM/TM实例);taskmanager.heap.size
用来配置TM的Memory,如果一个TM有N个Slot,则每个Slot分配到的Memory大小为整个TM Memory的1/N,同一个TM内的Slots只有Memory隔离,CPU是共享的;对Job而言:一个Job所需的Slot数量大于等于Operator配置的最大Parallelism数,在保持所有Operator的slotSharingGroup
一致的前提下Job所需的Slot数量与Job中Operator配置的最大Parallelism相等。
关于TM/Slot之间的关系可以参考如下从官方文档截取到的三张图:
图一: Flink On Yarn的Job提交过程,从图中我们可以了解到每个JM/TM实例都分属于不同的Yarn Container,且每个Container内只会有一个JM或TM实例;通过对Yarn的学习我们可以了解到每个Container都是一个独立的进程,一台物理机可以有多个Container存在(多个进程),每个Container都持有一定数量的CPU和Memory资源,而且是资源隔离的,进程间不共享,这就可以保证同一台机器上的多个TM之间是资源隔离的(Standalone模式下同一台机器下若有多个TM是做不到TM之间的CPU资源隔离)。
图二: Flink Job运行图,图中有两个TM,各自有3个Slot,2个Slot内有Task在执行,1个Slot空闲;若这两个TM在不同Container或容器上则其占用的CPU和Memory是相互隔离的;TM内多个Slot间是各自拥有 1/3 TM的Memory,共享TM的CPU,网络(Tcp:ZK, Akka, Netty服务等),心跳信息,Flink结构化的数据集等。
图三: Task Slot的内部结构图,Slot内运行着具体的Task,它是在线程中执行的Runable对象(每个虚线框代表一个线程),这些Task实例在源码中对应的类是org.apache.flink.runtime.taskmanager.Task
;每个Task都是由一组Operators Chaining在一起的工作集合,Flink Job的执行过程可看作一张DAG图,Task是DAG图上的顶点(Vertex),顶点之间通过数据传递方式相互链接构成整个Job的Execution Graph。
Operator Chain是指将Job中的Operators按照一定策略(例如:single output operator可以chain在一起)链接起来并放置在一个Task线程中执行;Operator Chain默认开启,可通过StreamExecutionEnvironment.disableOperatorChaining()
关闭,Flink Operator类似Storm中的Bolt,在Strom中上游Bolt到下游会经过网络上的数据传递,而Flink的Operator Chain将多个Operator链接到一起执行,减少了数据传递/线程切换等环节,降低系统开销的同时增加了资源利用率和Job性能;实际开发过程中需要开发者了解这些原理并能合理分配Memory和CPU给到每个Task线程。
注: 【一个需要注意的地方】Chained的Operators之间的数据传递默认需要经过数据的拷贝(例如:kryo.copy(…)),将上游Operator的输出序列化出一个新对象并传递给下游Operator,可以通过ExecutionConfig.enableObjectReuse()
开启对象重用,这样就关闭了这层copy操作,可以减少对象序列化开销和GC压力等,具体源码可阅读org.apache.flink.streaming.runtime.tasks.OperatorChain
与org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput
;官方建议开发人员在完全了解reuse内部机制后才使用该功能,冒然使用可能会给程序带来bug。
Operator Chain效果可参考如下官方文档截图:
图四: 图的上半部分是StreamGraph视角,有Task类别无并行度,如图:Job Runtime时有三种类型的Task,分别是Source->Map
, keyBy/window/apply
, Sink
,其中Source->Map
是Source()
和Map()
chaining在一起的Task;图的下半部分是一个Job Runtime期的实际状态,Job最大的并行度为2,有5个SubTask(即5个执行线程);若没有Operator Chain,则Source()
和Map()
分属不同的Thread,Task线程数会增加到7,线程切换和数据传递开销等较之前有所增加,处理延迟和性能会较之前差。补充:在slotSharingGroup
用默认或相同组名时,当前Job运行需2个Slot(与Job最大Parallelism相等)。
Slot Sharing是指来自同一个Job且拥有相同slotSharingGroup
(默认:default)名称的不同Task的SubTask之间可以共享一个Slot,这使得一个Slot有机会持有Job的一整条Pipeline,这也是上文我们提到的在默认slotSharing的条件下Job启动所需的Slot数和Job中Operator的最大parallelism相等的原因;通过Slot Sharing机制可以更进一步提高Job运行性能,在Slot数不变的情况下增加了Operator可设置的最大的并行度,让类似window这种消耗资源的Task以最大的并行度分布在不同TM上,同时像map, filter这种较简单的操作也不会独占Slot资源,降低资源浪费的可能性。
具体Slot Sharing效果可参考如下官方文档截图:
图五: 图的左下角是一个soure-map-reduce
模型的Job,source和map是4 parallelism
,reduce是3 parallelism
,总计11个SubTask;这个Job最大Parallelism是4,所以将这个Job发布到左侧上面的两个TM上时得到图右侧的运行图,一共占用四个Slot,有三个Slot拥有完整的source-map-reduce
模型的Pipeline,如右侧图所示;注:map
的结果会shuffle
到reduce
端,右侧图的箭头只是说Slot内数据Pipline,没画出Job的数据shuffle
过程。
图六: 图中包含source-map[6 parallelism]
,keyBy/window/apply[6 parallelism]
,sink[1 parallelism]
三种Task,总计占用了6个Slot;由左向右开始第一个slot内部运行着3个SubTask[3 Thread],持有Job的一条完整pipeline;剩下5个Slot内分别运行着2个SubTask[2 Thread],数据最终通过网络传递给Sink
完成数据处理。
Flink在默认情况下有策略对Job进行Operator Chain 和 Slot Sharing的控制,比如:将并行度相同且连续的SingleOutputStreamOperator操作chain在一起(chain的条件较苛刻,不止单一输出这一条,具体可阅读org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(...)
),Job的所有Task都采用名为default的slotSharingGroup
做Slot Sharing。但在实际的需求场景中,我们可能会遇到需人为干预Job的Operator Chain 或 Slot Sharing策略的情况,本段就重点关注下用于改变默认Chain 和 Sharing策略的API。
StreamExecutionEnvironment.disableOperatorChaining(): 关闭整个Job的Operator Chain,每个Operator独自占有一个Task,如上图四
所描述的Job,如果disableOperatorChaining
则 source->map
会拆开为source()
, map()
两种Task,Job实际的Task数会增加到7;这个设置会降低Job性能,在非生产环境的测试或profiling时可以借助以更好分析问题,实际生产过程中不建议使用。
someStream.filter(…).map(…).startNewChain().map(): startNewChain()
是指从当前Operator[map]
开始一个新的chain,即:两个map会chaining在一起而filter不会(因为startNewChain的存在使得第一次map与filter断开了chain)。
someStream.map(…).disableChaining(): disableChaining()
是指当前Operator[map]
禁用Operator Chain,即:Operator[map]会独自占用一个Task。
someStream.map(…).slotSharingGroup(“name”): 默认情况下所有Operator的slotGroup都为default
,可以通过slotSharingGroup()
进行自定义,Flink会将拥有相同slotGroup名称的Operators运行在相同Slot内,不同slotGroup名称的Operators运行在其他Slot内。
Operator Chain有三种策略ALWAYS
, NEVER
, HEAD
,详细可查看org.apache.flink.streaming.api.operators.ChainingStrategy
;startNewChain()
对应的策略是ChainingStrategy.HEAD
(StreamOperator
的默认策略),disableChaining()
对应的策略是ChainingStrategy.NEVER
,ALWAYS
是尽可能的将Operators chaining在一起; 在通常情况下ALWAYS
是效率最高,很多Operator会将默认策略覆盖为ALWAYS
,如filter, map, flatMap等函数。
JOB说明:
类似StreamETL,100 parallelism,即:一个流式的ETL Job,不包含window等操作,Job的并行度为100;
环境说明:
Standalone下的Job Execution Graph:10TMs * 10Slots-per-TM
,即:Job的Task运行在10个TM节点上,每个TM上占用10个Slot,每个Slot可用1C2G
资源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100
;
OnYarn下初始状态的Job Execution Graph:100TMs * 1Slot-per-TM
,即:Job的Task运行在100个Container上,每个Container上的TM持有1个Slot,每个Container分配1C2G
资源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100
;
OnYarn下调整后的Job Execution Graph:50TMs * 2Slot-per-TM
,即:Job的Task运行在50个Container上,每个Container上的TM持有2个Slot,每个Container分配2C4G
资源,GCConfig:-XX:+UseG1GC -XX:MaxGCPauseMillis=100
;
注: OnYarn下使用了与Standalone一致的GC配置,当前Job在Standalone或OnYarn环境中运行时,YGC, FGC频率基本相同,OnYarn下单个Container的堆内存较小使得单次GC耗时减少,生产环境中大家最好对比下CMS和G1,选择更好的GC策略,当前上下文中暂时认为GC对Job性能影响可忽略不计。
问题分析:
引起Job性能降低的原因不难定位,贴一张Container的线程图(VisualVM中的截图)图7:在一个1C2G的Container内有126个活跃线程,守护线程78个;首先,在一个1C2G的Container中运行着126个活跃线程,频繁的线程切换是会经常出现的,这让本来不就不充裕的CPU显得更加的匮乏;其次,真正与数据处理相关的线程是红色画笔圈起的14条线程(2条Kafka Partition Consumer
,Consumers和Operators包含在这个两个线程内;12条Kafka Producer
线程,将处理好的数据sink到Kafka Topic),这14条线程之外的大多数线程在相同TM不同Slot间可以共用,比如:ZK-Curator, Dubbo-Client, GC-Thread, Flink-Akka, Flink-Netty, Flink-Metrics等线程,完全可以通过增加TM下Slot数量达到多个SubTask共享的目的;此时我们会很自然的得出一个解决办法:在Job使用资源不变的情况下,减少Container数量的同时增加单个Container持有的CPU, Memory, Slot数量,比如上文环境说明中从方案2
调整到方案3
,实际调整后的Job运行稳定了许多且消费速度与Standalone基本持平。
注:
当前问题是公司内迁移类似StreamETL的Job时遇到的,解决方案简单不带有普适性,对于带有window算子的Job需要更仔细缜密的问题分析;
当前公司Deploy到Yarn集群的Job都配置了JMX/Prometheus两种监控,单个Container下Slot数量越多每次scrape的数据越多,实际生成环境中需观测是否会影响Job正常运行,在测试时将Container配置为3C6G 3Slot
时发现一次java.lang.OutOfMemoryError: Direct buffer memory
的异常,初步判断与Prometheus Client相关,可适当调整JVM的MaxDirectMemorySize
来解决,异常如下图8:
Operator Chain是将多个Operator链接在一起放置在一个Task中,只针对Operator;Slot Sharing是在一个Slot中执行多个Task,针对的是Operator Chain之后的Task;这两种优化的都充分利用了计算资源,减少了不必要的开销,提升了Job的运行性能。此外,Operator Chain的源码在streaming包下,只在流处理任务中有这个机制;Slot Sharing在flink-runtime包下,似乎应用更广泛一些(具体没考究)。
最后,只有充分的了解Slot,Operator Chain,Slot Sharing是什么以及各自的作用和相互间的关系,才能编写出优秀的代码并高效的运行在集群上。
结尾:贴一张官方讲解Slot的详图供大家参考学习:
Flink
是由公司前辈引入,最初版本是Flink 1.1.5
,后升级到Flink 1.3.2
一直以来Flink Job
都是跑在Standalone
集群上的,为规避Standalone
集群中Job
相互影响的问题(Standalone
无法做资源隔离),前辈们又新做了Flink
集群,将互相影响的Job
拆分到不同集群中,这时团队内部有多个Flink Standalone
集群,虽然解决了部分Job
互相影响的问题,但有些服务是和Kafka
,Druid
部署在同一台物理机器的(不同进程),又出现过几次不同服务互相影响的情况;这些Flink Standalone
集群维护/升级起来也挺麻烦的,同时新Job
越来越多,生活苦不堪言~.~;兄弟们从2018年初就喊着OnYarn
,结果由于人力、机器、业务迭代等各种问题一拖再拖;到2018年底终于开始了这项工作,虽然只1个人力投入,经历2-3个月的时间团队的streaming Job
已经稳定运行在OnYarn
模式下,本文就重点记录下这段时所遇到的一些重点问题,当前生产运行版本是Flink 1.7.1 + Hadoop 3.1.0
由于官方没有依赖Hadoop 3.1.0
的Flink
下载包,所以这步需要自行编译Flink
源码,可参考官方Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html;建议用公司内部环境(Jenkins, JDK, etc.)并结合官方Doc来完成打包。
简单列下我司打包的Jenkins配置:
Branches to build
: refs/tags/release-1.7.1
Repository URL
: git@gitlab.company.com:git/flink.git
Build - Goals and options
: clean install -DskipTests -Dfast -Dhadoop.version=3.1.0
Post Steps - Execute shell
: cd flink-dist/target/flink-1.7.1-bin
tar -czvf flink-1.7.1-hadoop31-scala_2.11.tar.gz flink-1.7.1/
报错:
使用./bin/flink run -m yarn-cluster
的方式提交Job时出现如下报错:
client端:
------------------------------------------------------------
The program finished with the following exception:
java.lang.RuntimeException: Error deploying the YARN cluster
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:556)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:72)
at org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:962)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:243)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.RuntimeException: Couldn`t deploy Yarn cluster
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:443)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:554)
... 12 more
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
server端:
Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: newWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function
at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:295)
at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:251)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
原因:
Flink 1.3.2
依赖的Netty3
的版本较低,与Hadoop 3.1.0
不兼容导致的报错;不兼容的Netty
版本为:Netty-3.4.0.Final
和Netty-3.10.6.Final
;曾尝试通过修改NioWorkerPool
的源文件并编译新的class文件以解决这个问题,但以失败告终;
解决:
升级Flink
版本;经测试:Flink 1.6, 1.7
+ Hadoop 3.1.0
编译出的包是可以正常提交给Hadoop 3.1.0
集群的;
结论:
Flink 1.3.2
与Hadoop 3.1.0
存在兼容性问题,若需提交Flink Job
到Hadoop 3.1.0
及以上,最好使用Flink 1.6+
Flink
类型系统和序列化方式可关注官方doc:
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html
Flink
默认支持PoJoSerializer
,AvroSerializer
,KryoSerializer
三种序列化,官方推荐使用Flink自己实现的PojoSerializer
序列化(在遇到无法用PoJoSerializer转化的时默认用KryoSerializer
替代,具体可关注GenericTypeInfo
),Kryo
性能较Avro
好些,但无供跨语言的解决方案且序列化结果的自解释性较Avro
差;经过功能性能等多方面的调研,计划Task
间数据传递用Kryo
,提高数据序列化性能,状态的checkpoint
,savepoint
用Avro
,让状态有更好的兼容&可迁移性;此功能从Flink 1.7+
开始支持具体关注doc:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html
OK,接下来说遇到的在Avro
下的对象序列化异常,本次bug和上边的内容无关~,~. 这个报错只是因为Flink-Avro
不支持null
的序列化;
报错:
开启Avro
序列化(enableForceAvro()
),Task
间通过网络传递数据对象时,若对象内部有field
为null
,数据无法正常序列化抛出如下异常:
java.lang.RuntimeException: in com.app.flink.model.Line in string null of string in field switchKey of com.app.flink.model.Line
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
... more
Caused by: java.lang.NullPointerException: in com.app.flink.model.Line in string null of string in field switchKey of com.app.flink.model.Line
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(AvroSerializer.java:135)
... 18 more
Caused by: java.lang.NullPointerException
at org.apache.avro.specific.SpecificDatumWriter.writeString(SpecificDatumWriter.java:65)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:76)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
原因:
Flink
在初始化Avro Serializer
时使用org.apache.avro.reflect.ReflectData
构造DatumReader
和DatumWriter
,ReflectData
本身不支持null
的序列化(Avro
有提供ReflectData
的扩展类org.apache.avro.reflect.AllowNull
来支持null
的序列化,但Flink
并未使用),从而导致null field
的序列化异常;而Kryo
默认支持null field
,程序可正常运行;Flink 1.3.2
下代码截图如下:
Avro:
Kryo:
Flink 1.7
后对集成Avro
的代码变更较大,但依旧会出现当前Bug,代码逻辑入口:
解决:
field
值赋默认值;Kryo
(enableForceKryo()
)序列化;enableForceXXX()
尝试下默认的PoJoSerializer
;本问题在Kafka0.8 + Flink 1.7
(cannot guarantee End-to-End exactly once
)环境下造成了数据丢失的问题;
报错:
异常一:kafka异常(导致了Job重启)
Caused by: java.lang.Exception: Could not complete snapshot 69 for operator wep
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
... 8 more
Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
... 13 more
异常二:Job重启,Dubbo异常
com.alibaba.dubbo.rpc.RpcException: Rpc cluster invoker for interface com.etl.commons.api.GenericService on consumer xxx.xx.xx.xx use dubbo version flink is now destroyed! Can not invoke any more.
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.checkWheatherDestoried(AbstractClusterInvoker.java:233)
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:215)
at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
at com.alibaba.dubbo.common.bytecode.proxy0.queryBaseDimension(proxy0.java)
过程:
Kafka
抖动或异常导致Flink sink data to Kafka
时丢失Topic Leader
(出现异常一),进而导致Flink Job
重启;
Flink Job
重启正常,Job
状态是running
,但flink operator
中的Dubbo
未初始化导致无法提供服务(在Flink Job
重启过程中由于DubboService.java
中代码不规范而导致Dubbo
服务未初始化,Dubbo
服务不可用);
Flink Source
开始消费数据,当需调用含有DubboService
的Operator
处理数据时抛异常(出现异常二),数据无法正常处理并下发到下一个ChainOperator
;
上游数据不断被消费,但无法正常处理并下发,这就照成的Source
端丢数的现象;
原因:
在Standalone
模式下Flink Job
的失败自动重启Dubbo
服务可以正常初始化的,但在OnYarn
模式下初始化Dubbo
服务会不可用;
Flink Standalone
下失败重启过程中,DubboService
会先卸载再重新加载,Dubbo
服务正常初始化;
OnYarn
下失败重启的过程中类DubboService
不会被JVM卸载,同时代码书写不规范,导致内存驻留了已被closed的DubboService
;
调用已经closed的Dubbo
服务则数据无法正常处理并抛异常;
问题代码:
private static DubboService dubboService;
public static void init() {
if (dubboService == null) {
Dubbo.init();
logger.info("Dubbo initialized.");
} else {
logger.info("Dubbo already existed.");
}
}
// 未对dubboService做置空处理
public static void destroy() {
dubboService.destroy();
}
解决:
在destory
方法中清除所有static
的共享变量,使得job
重启时候可以正常重新初始化Dubbo
服务,如下代码:
修改代码:
public static void destroy() {
dubboService.destroy();
// 置空
dubboService = null;
}
补充:
OnYarn
的Flink Job
重启时候是不会释放Container
并重新申请的,自动重启前后的JobId
保持不变;
Standalone
模式下,User code的是在TaskManager
环境中加载启动的用的ClassLoader
是:org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
;
OnYarn
模式下(bin/flink run -m yarn-cluster
),User Code
和Flink Framework
一并提交到Container
中,User Code
加载用的ClassLoader
是:sun.misc.Launcher$AppClassLoader
;
由于ClassLoader
不同,导致User Code
卸载时机不同,也就导致在OnYarn
模式下Restart Job
,DubboService
初始化异常;
可参考Flink Docs:https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/debugging_classloading.html
注: coding
时候尽量用RichFunction
的open()/close()
方法初始化对象,尽可能用(单例)对象持有各种资源,减少使用static
(生命周期不可控),及时对static
修饰对象手动置空;
解决该问题需要对Flink的Task数据交换&内存管理有深刻的理解,可参考文章: https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
环境:
Job
的运行Topology
图:
graph LR
A(Task-Front `parallelism:30`) -->|shuffle| D(Task-End `parallelism:90`)
Task-Front
负责从Kafka读取数据、清洗、校验、格式化,其并行度为30;
Task-End
负责数据的解析&处理,并发送到不同的Kafka Topic
;
Task-Front
并行度为30,Task-End
并行度为90;
Task-Front
通过网络IO将数据传递给Task-End
;
Job压测环境:
Flink 1.3.2
的Standalone
模式;
2个JobManager
高可用;
8个TaskManager
(一台物理机一个TaskManager
),主要配置如下:
taskmanager.numberOfTaskSlots: 13
taskmanager.network.numberOfBuffers: 10240
taskmanager.memory.segment-size: 32768 # 32K
taskmanager.memory.size: 15g
问题:
在Standalone
下由于taskmanager.network.numberOfBuffers
配置数量过小导致data exchange buffer
不够用,消费Kafka
数据的速度减缓并出现大幅度的抖动,严重情况下导致FlinkKafkaConsumer
停止消费数据,从JMX
监控看到如下现象:
:
分析:
从JMX
监控看,主要是因为Buffer
原因导致的线程wait
,cpu
抖动;
共8台机器,每台机器上有13个Slot
,根据Job
是30 + 90
的并行度来看,每台机器上同时存在多个Task-Front
和Task-End
;
Task-Front
需将处理好的数据存储到ResultPartition
;
Task-End
需将接收到的数据存储到InputGate
以备后续处理;
Task-End
将处理好的数据直接发送给Kafka Topic
,所以不需要ResultPartition
;
一个Task-Front
需将自己处理的数据shuffle
到90个Task-End
;
通过Flink Buffer
分配原理可知,需要30*90
个ResultPartition
,90*30
个InputGate
;
若平均分则每个ResultPartition/InputGate
可分配到的Buffer
数量是:
(10240 * 8) / (30*90 + 90 * 30) ≈ 15个(每个32k);
在8w+/s, 5~10k/条
的数据量下的15个Buffer
会在很短时间被填满,造成Buffer
紧缺,Task-Front
无法尽快将数据shuffle
到下游,Task-End
无法获取足够的数据来处理;
解决:
增大taskmanager.network.numberOfBuffers
数量,最好保证每个ResultPartition/InputGate
分配到90+(压测预估值,不保证效果最佳)的Buffer
数量;
补充:
在OnYarn
下几乎不会遇到该为题,了解该问题有助于理解Flink
内存管理&数据传递;
Flink 1.5
之后Buffer
默认用堆外内存,并deprecated
了taskmanager.network.numberOfBuffers
,用taskmanager.network.memory.max
与taskmanager.network.memory.min
代替;
关于Buffer
分配&动态调整等逻辑可关注以下几个类:TaskManagerRunner
/ Task
/ NetworkEnvironment
/ LocalBufferPool
/ InputGate
/ ResultPartition
/ MemorySegment
,在此不做源码解读,最后贴两个截图:
Standalone
与OnYarn
下的日志配置和输出路径有较大的区别,Standalone
下直接在maven
的resources
目录下配置log4j2.xml/log4j.properties
即可完成日志配置,但在OnYarn
下 需要通过修改${flink.dir}/conf/log4j.properties
的配置来定义日志输出,这些配置在container.sh
启动TaskManager/JobManager
时被加载以初始化log4j
,配置如下:
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
# ${log.file} input by luanch_container.sh
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.encoding=UTF-8
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
${flink.dir}/conf
下不同log配置文件所适用的范围如下图:
${flink.dir}
中默认不包含DailyRollingFileAppender
的依赖,所以在使用DailyRollingFileAppender
时还需要添加依赖apache-log4j-extras-1.2.17.jar
到${flink.dir}/lib/
目录,Flink Job
的日志会输出到Yarn Container
的日志目录下(由yarn-site.xml
中的yarn.nodemanager.log-dirs
指定),Container
的启动脚本(可在YarnUI log
中找到)示例:
// log.file 即日志输出目录
exec /bin/bash -c "$JAVA_HOME/bin/java -Xms1394m -Xmx1394m -XX:MaxDirectMemorySize=654m -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -Dlog.file=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1547994202702_0027/container_e02_1547994202702_0027_01_000002/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /home/hadoop/apache/hadoop/latest/logs/userlogs/application_1547994202702_0027/container_e02_1547994202702_0027_01_000002/taskmanager.out 2> /home/hadoop/apache/hadoop/latest/logs/userlogs/application_1547994202702_0027/container_e02_1547994202702_0027_01_000002/taskmanager.err"
运行中的FlinkOnYarn Job日志查看:
直接通过YarnUI
界面查看每个Container
的日志输出,如下图:
在container
所在节点的对应目录下通过tail, less
等shell命令查看日志文件,如下图:
在hadoop
配置enable log aggregation
时,可以通过yarn logs -applicationId ${application_id}
获取log;
已完成的FlinkOnYarn Job日志查看:
在配置hadoop log aggregation
时,可以通过yarn logs -applicationId ${application_id}
获取log;
注: Spark
中HistoryServer
支持用户从YarnUI
中查看已完成Job
的日志等,但Flink
中的HistoryServer
在OnYarn
下不可用,留下问题后续解决;
FlinkUI
的指标监控使用起来不够方便,同时在Job
比较大时往FlinkUI
上添加监控指标时卡顿现象非常明显,需要选择一个更好的Job监控工具来完成Job的监控任务,Prometheus
是一个很好的选择;具体说明可参考官方Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html;
Flink
从1.4版本开始支持Prometheus
监控PrometheusReporter
,在Flink1.6
时有加入了PrometheusPushGatewayReporter
的功能;
PrometheusReporter
是Prometheus
被动通过Http
请求Flink
集群以获取指标数据;
PrometheusPushGatewayReporter
是Flink
主动将指标推送到Gateway
,Prometheus
从Gateway
获取指标数据;
OnYarn
下Flink
端开启Prometheus
监控的配置:
// Flink可以同时定义多个reportor,本示例定义了jmx和prometheus两种reportor
// JMX用RMI,Prom用Http
metrics.reporters: my_jmx_reporter,prom
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9045
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9275
当Job
分配到Yarn Container
启动JobManager/TaskManager
的过程中会实例化metrics.reporter.xxx.class
配置的类,启动监控服务;
PrometheusReporter
是通过暴漏Http地址来提供监控指标访问功能的,在每个JobManager/TaskManager
启动的过程中都会实例化一个PrometheusReporter
对象,并启动HttpServer
服务绑定到一个端口上
(metrics.reporter.prom.port
配置的区间从小到大第一个未占用的端口);因此metrics.reporter.prom.port
配置的区间可允许的最大端口数必须大于等于一台机器最大可启动的Container
数量,
否则会有监控服务启动失败的报错,如下:
报错:
2019-03-12 20:44:39,623 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring prom with {port=9250, class=org.apache.flink.metrics.prometheus.PrometheusReporter}.
2019-03-12 20:44:39,627 ERROR org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate metrics reporter prom. Metrics might not be exposed/reported.
java.lang.RuntimeException: Could not start PrometheusReporter HTTP server on any configured port. Ports: 9250
at org.apache.flink.metrics.prometheus.PrometheusReporter.open(PrometheusReporter.java:70)
at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:153)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:137)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:330)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:301)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:298)
本文简单总结了Flink使用过程中遇到的几个重要的问题,taskmanager.network.numberOfBuffers配置
的问题有在stackoverflow, FlinkCommunity问过,但没得到较为准确的答案,最终通过gg文档&源码才逐渐理解,源码&Google的重要性不言而喻!
第一次听说协变&逆变
是刚接触Scala
的时候,但协变&逆变
却不是Scala
所特有的,Java
, C#
等语言也有协变&逆变
的概念,本文首先会解释协变&逆变
这两个术语的含义,然后进一步探讨它们在Scala
中具体的书写语法和使用场景。
协变&逆变
的概念入门可以关注下边这两个篇文章,本段内容也是学习这两篇文章所得到的:Covariance and Contravariance of Hosts and Visitors, Cheat Codes for Contravariance and Covariance。
实体
:Fuel
, Plant
, Bamboo
;图中的符号>
表示可替代性(类比多态:子类可作为父类使用),Plant
可以做为这一种Fuel
(燃料),Bamboo
是一种Plant
,在作为燃料的这件事情上也可以认为 Bamboo extends Plant
, Plant extends Fuel
;Box
:用于实体(Fuel
, Plant
, Bamboo
)的存放;
消费者
:Burn
, Animal
, Panda
;
/** box contains entities */
class Box<T> {
/**
* 若有一个Box<Plant>,可以用BoxVisitor<Fuel,_>, BoxVisitor<Plant,_>来消费他,不能用BoxVisitor<Bamboo,_>来消费;
* 对于BoxVisitor来讲T是逆变的,只能接收类型为T或T父类的BoxVisitor,即BoxVisitor<? super T>;
*
*/
public <R> R accept(BoxVisitor<? super T, ? extends R> visitor) {
return null;
}
}
/** consume entities in box */
class BoxVisitor<U, R> {
public R visit(Box<? extends U> box) {}
}
class Burn<R> extends BoxVisitor<Fuel, R> {
/**
* 对于Burn而言,它可消费Fuel,Plant,Bamboo,所以visit可接收任何有Fuel属性的Box;
* 由此Box<T>可以被认为是`协变`的,即如果T extends U 则 Box<T> extends Box<U>
*/
public R visit(Box<? extends Fuel> box) {}
}
Burn
可以燃烧一切包括:Fuel
, ` Plant,
Bamboo;
Animal可以吃所有类型的植物包括:
Plant,
Bamboo;
Panda只吃一种植物:
Bamboo`。由此,可得出下面这关系张图:
如果消费者是Burn
,我可以把Box<Fuel>
, Box<Plant>
, Box<Bamboo>
传递给它;如果消费者是Animal
,我可以把Box<Plant>
, Box<Bamboo>
传递给它;如果消费者是Panda
,我可以把Box<Bamboo>
传递给它。
对于Box<T>
而言,它存储的实体对象用于被使用/消费,所以它是一个生产者,若能将Box<Fuel>
传给某个BoxVisitor
,则也能将Box<Plant>
, Box<Bamboo>
传递给这个BoxVisitor
,这时Box<Fuel>
中的泛型是上界(upper bound
),需要用extends
关键字来定义Box<T>
的范围,该范围能被上文中的BoxVisitor
消费。即,若BoxVisitor
能接收装满T
的Box
,则肯定能接收装满T
子类的Box
(Box<T>
是生产者,remember: Prdoucer Extends
),Box<T>
随着它的泛型T
进行协变,协变的简单定义:
如果:
graph LR
A(Class A) -->|extends| B(Class B)
则:
graph LR
A(Box < A >) -->|extends| B(Box < B >)
注::Java中的数组是协变的,如下列Java代码可正常编译,但运行时会抛出ArrayStoreException
:
Box.Bamboo[] bamboos = new Box.Bamboo[1];
bamboos[0] = new Box.Bamboo();
Stream.of(bamboos).forEach(System.out::println);
Box.Plant[] plants = bamboos;
plants[0] = new Box.Plant();
// throw ArrayStoreException at running stage.
Stream.of(plants).forEach(System.out::println);
Fuel
的Box
,传递给Burn
(汽油只可以燃烧);可以将装满Plant
的Box
传递给Burn
, Animal
(植物可以燃烧&吃);可以将装满Bamboo
的Box
传递给Burn
, Animal
, Panda
(竹子可以燃烧,可以给动物&熊猫吃)。由此我们可以得出下面的这张图:
如果我有Box<Fuel>
,我可以把它传递消费者Burn
;如果我有Box<Plant>
,我可以把它传递给消费者Animal
, Burn
;如果我有Box<Bamboo>
,我可以把它传递给消费者Panda
, Animal
, Burn
。
对于BoxVisitor<T>
本身而言,它是一个可消费<T>
的消费者,若一个Box<Bamboo>
能被Panda
(即: BoxVisitor<Bamboo>
)消费,则其必然也能被Animal
(即: BoxVisitor<Plant>
), Burn
(即: BoxVisitor<Fuel>
)消费,BoxVisitor<Bamboo>
中的泛型Bamboo
是BoxVisitor
的下界(lower bound
),需用supper
来定义BoxVisitor<T>
中T
的取值范围,这个范围内的BoxVisitor
都可用于消费Box<T>
(BoxVisitor<T>
是消费者,remember: Consumer Super
),BoxVisitor<T>
是逆变的,逆变简单定义:
如果:
graph LR
A(Class A) -->|extends| B(Class B)
则:
BoxVisitor<Parent> extends BoxVisitor<Child>
graph LR
B(BoxVisitor < B >) -->|extends| A(BoxVisitor < A >)
协变&逆变在Java
中并没有上升到语法层面,而Scala
在语法层面对协变&逆变进行的详细的语法设计,Scala
中用类型参数[T]
表示不变,[+T]
表示协变,[-T]
表示逆变,具体Scala
中协变&逆变的语法特性&约束会在下文进行详细的分析讲解。
定义若干类结构:
class Fuel
class Plant extends Fuel
class Bamboo extends Plant
// 声明类型`Box`,其类型参数`T`是协变的
class Box[+T](ts:T*)
// 类型参数逆变
class BoxVisitor[-T]
一个类可被定义为协变
的前提是:这个类型类似Producer Extends
,并且是只读的(read-only
),若非只读则会出现如下问题场景(ArrayStoreException
异常):
// 假设Array是协变的
// 创建一个Circle数组并装入10个Circle对象
val circles: Array[Circle] = Array.fill(10)(Circle(..))
// 当Aarry[T]是协变时该行代码有效
val shapes: Array[Shape] = circles
// 将数组中第一个元素修改为Square对象(Square is a subtype of Shape)
// 若协变非read-only,此处编译通过,但运行会抛异常(向Array[Circle]添加Square对象,这是不被允许的)
shapes(0) = Square(..)
子类对象赋值给父类变量
因为Bamboo <: Plant <: Fuel
而且Box[+T]
是协变
的,所以Box[Bamboo] <: Box[Plant] <: Box[Fuel]
,同时由多态原则:子类可看做父类,最终得出下边有效代码:
val bamboos: Box[Bamboo] = new Box[Bamboo](new Bamboo())
// 根据协变特性,下列代码可正常运行
val plants: Box[Plant] = bamboos
val fuels: Box[Fuel] = bamboos
协变类型内部函数限定
/**
* covariance:readonly, non-writable
* 即:入参:限定不能将A & A的子类传递给Foo
* 返回值:无限制
*/
trait Foo[+T] {
// def fun[①](②)(op: ③): ④
def fun0[U >: T](u: U)(op: T => U): U
def fun1[U >: T](u: U)(op: T => U): U => T
}
定义的协变类Foo[+T]
,类内部定义函数的通式包含着组成函数的所有元素:①:类型参数定义,②:对象类型入参定义,③:函数类型的入参定义,④:返回值/函数定义,在协变类里每个位置的条件限制如下:
①类型参数:不能定义子类类型[U]
([U <: T]
),原因:Covariant type T occurs in contravariant position in type T of value U;
②对象入参:入参的类型不能为T
& subType of T
,原因:Covariant type T occurs in contravariant position in type T of value u;
③函数入参:函数的出参不能为T
& subType of T
,原因:Covariant type T occurs in contravariant position in type U => T of value op;
④返回值:无限制;
④返回函数: 函数入参不能为T
& subType of T
,原因:Covariant type T occurs in contravariant position in type T => T of value fun;
In A Picture:
逆变位置:只允许定义或传入super T
的类型,或是与T
无父子关系的类型;
协变位置:对类型无限制,可任意定义;
个人理解:协变
可看做生产者Producer Extends
并且 readonly
;所以协变类内部不能存在add(_ extends T)
的函数,对函数的返回值则无限制,所以就有了上图中的语法约束;
逆变
是与协变
相对的一个概念,可以将逆变
看做消费者Comsumer super
,并且是write-only
的,若非write-only
会出现如下问题场景:
// 假设Array是逆变的
// 首先创建数组并装入Shape对象
val shapes: Array[Shape] = Array.fill(10)(Shape(..), Shape(..))
// Works only if Array is contravariant
val circles: Array[Circle] = shapes
// 编译异常,circles(0)实际是Shape对象,是Circle的父类
val circle: Circle = circles(0)
父类对象赋值给子类变量
// 由于BoxVisitor是逆变的,所以下边代码可正常编译运行
val fuelVisitor: BoxVisitor[Fuel] = new BoxVisitor[Fuel]
val plantVisitor: BoxVisitor[Plant] = fuelVisitor
val bambooVisitor: BoxVisitor[Bamboo] = fuelVisitor
// 作为消费者,能消费类型`T`则必定能消费T的父类,但不一定能消费子类,
// 因为子类在拥有所有父类可被外部访问的变量和方法的同时扩展或覆盖了父类的变量和方法
逆变类型的内部函数限定
/**
* contravariance:write-only, non-readable
* consumer super
*/
trait Foo[-T] {
// def fun[①](②)(op: ③): ④
def fun0[U <: T](u: U)(op: U => T): U
def fun1[U <: T](u: U)(op: U => T): T => U
}
在逆变类Foo[-T]
中函数通式(def fun[①](②)(op: ③): ④
)的各位置的条件限制如下:
①类型参数:不能定义父类类型[U]
([U >: T]
),原因:Contravariant type T occurs in covariant position in type T of value U;
②对象入参:无限制;
③函数入参:函数的入参不能为T
& superType of T
,原因:Contravariant type T occurs in covariant position in type T => U of value op;函数出参无限制;
④返回值:不能为T
& superType of T
,原因:Contravariant type T occurs in covariant position in type T of value fun;
④返回函数: 函数出参不能为T
& superType of T
,原因:Contravariant type T occurs in covariant position in type U => T of value fun1;
In A Picture:
协变位置:只允许定义或传入extends T
的类型;
逆变位置:对类型无限制,可任意定义;
个人理解:逆变
可看做消费者Consumer super
并且 writeonly
;所以协变类内部不能存在return (_ extends T)
的函数,对函数的入参则无限制,所以就有了上图中的语法约束;
不变
就是平时常见的类型参数,比如ListBuffer[T]
, ArrayBuffer[T]
等都是不变的,可以对它们进行 read
& write
等操作,只要满足严格的类型约束要求即可;
scala中常见的协变
/逆变
类有:List[+T]
, Seq[+A]
, Option[+A]
, Future[+T]
, Tuple[..]
, Map[A, +B]
, Function1[-T1, +R]
, CanBuildFrom[-From, -Elem, +To]
, etc.
Function1[-T1, +R]
的另一种常见写法-T1 => +R
(入参逆变,出参协变)
示例
def main(args: Array[String]): Unit = {
// 协变
val list = List[String]("A", "B", "CC")
println(toInt(list))
// 协变&逆变组合
val fun = (v1: CharSequence) => v1.toString.map(_.toInt)
println(toInt(fun, "ABC"))
}
// 协变,入参类型允许的范围`List[_ <: CharSequence]`
def toInt(list: List[CharSequence]): Seq[Int] = list.map(_.length)
// 协变&逆变组合使用,第一个入参类型允许的范围`Function1[_>:String, _<:Seq[Int]]`
// 第一个入参类型范围的另一种表示:`_ >: String => _ <:Seq[Int]`
def toInt(fun: String => Seq[Int], in: String): Seq[Int] = fun(in)
本文主要对协变
,逆变
进行了详细的语法讲解&场景演示,但这些只是开始,大家需在平时工作中多结合应用场景多练习才能达到灵活运用的目的;此外协变
,逆变
只在scala
编译期进行语法校验,不影响runtime
,编译出的字节码会被部分裁切以满足JVM
字节码规范。
本文示例的代码存储在工程:https://github.com/itinycheng/jvm-lang-tutorial ,包com.tiny.lang.java.generic.variance
和 com.tiny.lang.scala.generic.variance
中。
Scala
类型参数与Java
的泛型体系是较相似的语言特性,但Scala
类型参数体系的实现比Java
更为的多样和复杂;Scala
类型参数体系包含如下:[多重]上界(T <: UpperBound
)、[多重]下界(T >: LowerBound
)、[多重]视图界定(T <% ViewBound
)、[多重]上下文界定(T : ContextBound
)、类型约束(T =:= U
, T <:< U
, T <%< U
)、协变(+T
)、逆变(-T
);可以认为Scala
类型参数在覆盖Java泛型所有特性的基础上又做了更多的扩展延伸,即Java
泛型是Scala
类型参数的一个子集。本文主要讲解Scala
参数类型的各种特性并辅助与Java
泛型做对比分析,是一个供给Java Developer学习用的Scala
类型参数入门指南。
首先定义若干类结构
trait Food
trait Medicine
class Fruit extends Food
class Apple extends Fruit
// 橘子是一种水果,橘子皮有药用价值
class Orange extends Fruit with Medicine
T <: upperBound
类型上界T >: lowerBound
类型下界T >: lowerBound <: upperBound
多重界定with
连接多个上界或多个下界用符号<:
来表示类型参数的上界,如_ <: Food
表示任意类型_
是Food
的子类,默认情况下Scala
类的上界是Any
(类似Java
中的Object
),即任意类型参数可表示为_ <: Any
。
scala
支持同时声明多个上界,两个上界类型之前用with
连接,例如:_ <: Fruit with Medicine
给任意类型_
定义了两个上界:Fruit
, Medicine
,在上文定义的类中只有Orange
满足这个多重上界的限定。with
可以多次使用,用以连接多个上界类型,语法如下:_ <: TypeA with TypeB with TypeC with ...
。
用符号>:
来表示类型参数的下界,如:_>:Fruit
表示任意类型_
需满足是Fruit
的父类的上界限定,scala
也支持多重上界的语法,如:_ >: Fruit with Food
表示任意类型_
必须同时满足是Fruit
和Food
的父类,上文定义类中无满足该要求的类,AnyRef
可以满足这个多重上界的要求(scala
中AnyRef
是任意引用类型的父类)。
上界和下界一起使用可被称为多重界定,即一个类型参数时有上界和下界,如:_ >: Apple <: Food
表示任意类型_
需满足以Apple
作为下界,同时以Food
作为上界,上文定义的类中Food
满足该限定;此外,一个类型参数也可以同时拥有多上界和多个下界,如:_ >: Apple with Food <: AnyRef with Serializable
定义了两个下界Apple
和Food
,以及两个上界AnyRef
和Serializable
。
/**
* 函数接收一个ListBuffer类型的入参,ListBuffer存放的对象必须是Fruit子类
*/
def test0(list: ListBuffer[_ <: Fruit]): Unit = {
// compile error: list ++= List(new Fruit,new Orange,new Apple)
list.foreach(println)
}
/**
* define a generic type `T` which has an upper bounds `Fruit`
*/
def test1[T <: Fruit](list: ListBuffer[T]): Unit = {
// compile error :list ++= List(new Fruit,new Orange,new Apple)
list.foreach(println)
}
/**
* 函数接收一个ListBuffer类型入参,ListBuffer存放对象必须是Fruit父类
*/
def test2(list: ListBuffer[_ >: Fruit]): Unit = {
list ++= List(new Apple, new Fruit)
list.foreach(println)
}
/**
* @see [[test2]]
*/
def test3[T >: Fruit](list: ListBuffer[T]): Unit = {
list ++= List(new Apple, new Fruit)
list.foreach(println)
}
/**
* multi upper bounds
*/
def test4[T <: Fruit with Medicine](list: ListBuffer[T]): Unit = {
//compile error: list ++= List(new Orange)
list.foreach(println)
}
/**
* multi lower bounds
*/
def test5[T >: Fruit with Medicine](list: ListBuffer[T]): Unit = {
list ++= List(new Orange)
list.foreach(println)
}
/**
* have upper & lower bounds at the same time
*/
def test6[T >: Apple <: Food](list: ListBuffer[T]): Unit = {
// compile error: list += new Fruit
list.foreach(println)
}
/**
* multi upper & lower bounds
*/
def test7[T >: Apple with Food <: AutoCloseable with Serializable](list: ListBuffer[T]): Unit = {
// compile error: list ++= List(new Orange, new Apple)
list.foreach(println)
}
实际上: 上界对应的是Java
中的extends
关键字,下界对应Java
中的super
关键字,scala
的上下界语法特性比Java
的语法表现更全面;在Java
中无法定义一个泛型<T super Fruit>
(可能在Java
看来super of every class is Object
);但Java
支持多重的上界,比如定义一个泛型T
必须继承两个类<T extends Fruit & Medicine>
,其中&
与scala
中的with
关键字相对。将scala
源码编译成字节码,然后反编译字节码进行观察发现之前定义的带有上下界的类型参数只剩下extends
关键字(对应<:
),而super
关键字(对应>:
)基本被擦除(Be Erased
),从这一层面来讲,上下界是scala
特有的语法层特性,是编译时特性非运行时,scala
源码编译成字节码时会进行语法表达的转换和裁切,以符合JVM的字节码规范。
T <% viewBound
视图界定(deprecated from scala 2.11
)用符号<%
来表示视图界定,T <% Juice
表示在当前.scala
文件的上下文中,存在一个隐式函数可以将类型T
转换为Juice
。
scala
语法也支持多重的视图界定,如:T <% Juice <% Soup
表示类型T
既可以隐式转换成Juice
也可以转换成Soup
。
注: 视图界定在Scala 2.11
版本已经deprecated
,请用隐式参数替换,如下代码示例中的test2
, test3
;
class Juice(food: Fruit){
def juice = "Juice"
}
class Soup(food: Fruit){
def soup = "Soup"
}
/**
* 水果可以榨汁
*/
implicit def fruitToJuice(fruit: Fruit): Juice = {
new Juice(fruit)
}
/**
* 水果可以做汤
*/
implicit def fruitToSoup(fruit: Fruit): Soup = {
new Soup(fruit)
}
// 引入隐式转换函数
import fruitToJuice, fruitToSoup
/**
* 该函数要求传入的ListBuffer中的T的对象可以隐式转换成Juice
* view bounds are deprecated,use implicit parameter instead.
* input parameter `T` can convert to `Juice`
*/
def test0[T <% Juice](list: ListBuffer[T]): Unit = list.foreach(i => println(i.juice))
/**
* multi view bound
*/
def test1[T <% Juice <% Soup](list: ListBuffer[T]): Unit = {
// 通过隐式转换成Juice而拥有juice函数
list.foreach(i => println(i.juice))
// 通过隐式转换成Juice而拥有soup函数
list.foreach(i => println(i.soup))
}
/**
* Use this function to replace [[test0()]]
*/
def test2[T](list: ListBuffer[T])(implicit fun: T => Juice): Unit = list.foreach(i => println(i.juice))
/**
* Use this function to replace [[test1()]]
*/
def test3[T](list: ListBuffer[T])(implicit f1: T => Juice, f2: T => Soup): Unit = {
list.foreach(i => println(i.juice))
list.foreach(i => println(i.soup))
}
// 调用函数
def main(args: Array[String]): Unit = {
val fruits = ListBuffer[Fruit](new Apple, new Orange, new Fruit)
test0(fruits)
test1(fruits)
test2(fruits)
test3(fruits)
}
实际上: 视图界定是scala
为了让coding更简洁高效而设计出的一个语法糖,其实现是scala
编译器在编译生成字节码过程中的一个自动插入隐式函数到所需位置的操作;如在上边的代码中ListBuffer[Fruit]
作为入参传入test0
,在foreach
时Fruit
对象是没有juice
函数供给调用的,编译这段代码时不通过,这时编译器会从上下文中找隐式转换,找到了fruitToJuice
,并调用该函数将fruit
对象转换为Juice
,然后在调用Juice
中的juice
函数(.class反编译结果类似list.foreach(i => println(fruitToJuice(i).juice))
);Java
中无对应特性与之对应,即每个调用必须明确,编译器并不做类似自动化的补全操作。
T: contextBound
上下文界定(存在ContextBound[T]
的隐式值)上下文界定的形式为T : M
,其中M
是另一个泛型类,它要求必须存在一个类型为M[T]
的隐式值。如:[T : Ordering]
表示必须存在一个Ordering[Fruit]
的隐式值,在使用隐式值的地方声明隐式参数
(如:(implicit ordering: Ordering[T])
)。
多重上下文界定形式为T : M : N
,表示当前代码上下文中必须存在类型M[T]
和N[T]
两个隐式值,同样,在使用隐式值的地方可以显式的声明隐式参数
。
首先定义隐式值
implicit object FruitOrdering extends Ordering[Fruit] {
override def compare(x: Fruit, y: Fruit): Int = {
x.getClass.getName.compareTo(y.getClass.getName)
}
}
/**
* alternative
*
* has the same effect as FruitOrdering, but not singleton, better use `FruitOrdering`
*/
implicit def ordering: Ordering[Fruit] = new Ordering[Fruit] {
override def compare(x: Fruit, y: Fruit): Int = x.getClass.getName.compareTo(y.getClass.getName)
}
import FruitOrdering
import scala.reflect.ClassTag
def test0[T: Ordering](first: T, second: T): Unit = {
// compile error: val arr = new Array[T](2)
println(first + ", " + second)
}
/**
* use `implicit` parameter or else `Fruit extends Ordering[Fruit]`
* 必须存在一个类型为`Ordering[T]`的隐式值
*/
def test1[T: Ordering](first: T, second: T)(implicit ordering: Ordering[T]): Unit = {
// compile error: val arr = new Array[T](2)
val small = if (ordering.compare(first, second) < 0) first else second
println(small)
}
/**
* ClassTag 用于保存运行时`T`的实际类型,`new Array[T](2)`就可以正常编译通过
*
* scala command compile result:
* `test2: [T](first: T, second: T)(implicit evidence$1: Ordering[T], implicit evidence$2: scala.reflect.ClassTag[T])Unit`
*/
def test2[T: Ordering : ClassTag](first: T, second: T)(implicit ordering: Ordering[T]): Unit = {
val arr = new Array[T](2)
val small = if (ordering.compare(first, second) < 0) first else second
println(arr + ", " + small)
}
// 调用函数
def main(args: Array[String]): Unit = {
test0(new Apple, new Orange)
test1(new Apple, new Orange)
test2(new Apple, new Orange)
}
说明:
关于隐式转换,隐式参数相关内容本文并没做太多的讲解,网上相关资料比较多,大家可以尝试自助学习下;
将上述函数test0
输入到scala command
得到的结果为test0: [T](first: T, second: T)(implicit evidence$1: Ordering[T])Unit
,即[:Ordering]
被编译成了隐式参数:implicit evidence$1: Ordering[T]
,这是上下文界定的特定编译方式,大家需要牢记这个编译规则;
上述函数test1
中显式的添加了隐式参数implicit ordering: Ordering[T]
,其原因是在编码阶段函数内部需要对Ordering[T]
的实例对象进行调用,不得不添加该隐式参数(编译期动态插入的隐式参数在编码阶段引用不到),test1
输入到scala command
得到的结果为test1: [T](first: T, second: T)(implicit evidence$1: Ordering[T], implicit ordering: Ordering[T])Unit
,在当前代码上下文中main
函数调用test1
的入参为(Apple, Orange, FruitOrdering ,FruitOrdering)
,即单例对象FruitOrdering
会同时出现在第三、四参数位上;
上述函数test2
所处的上下文中并没有找到与ClassTag
相对应的隐式值,这是scala
编译器在编译时对ClassTag
做特殊处理,在scala command
下的编译的结果在预料之中,出现ClassTag
的隐式参数,反编译字节码会发现new Array[T](2)
被转换为classTag.newArray(2)
,其内部通过调用Java
中的Array.newInstance(..)
动态创建数组对象;main
函数中调用test2
的代码的入参中隐式参数ClassTag
被编译成ClassTag..MODULE$.apply(Fruit.class)
,这是scala
编译器的对ClassTag
的特殊处理,大家明白是编译器行为即可。
类型约束是一种比较严格的类型限定方式:
T =:= U
表示T与U的类型相同T <:< U
表示T是U的子类型T <%< U
表示T可被隐式转换为U (2.10 deprecated
,2.11 removed
)T =:= U
是严格的类型约束,要求两个类型完全相等(包括类型参数),如:List =:= List
is true,但是List[Apple] =:= List[Orange]
is false;
T <:< U
是严格的类型约束(与<:
相比),要求前者T
必须是后者U
的子类或类型相同;
T <%< U
在scala 2.10
已经被标注deprecated,在scala 2.11
被移除;同时视图界定<%
从scala 2.11
开始被标记为deprecated,在未来版本可能会被移除掉,在大家用到视图界定时候最好的选择是在需要隐式转换的地方进行显式的声明(可关注本文视图界定
所讲述的内容);
注: 类型约束顾名思义是为对类型进行约束的,仅此而已;
/**
* restrict: T eq Orange
*/
def test0[T](i: T)(implicit ev: T =:= List[Fruit]): Unit = {
i.foreach(println)
}
/**
* same as [[test0()]]
*/
def test1[T](i: T)(implicit ev: =:=[T, Orange]): Unit = {
println(i)
}
def test2[T](list: List[T])(implicit ev: T <:< Fruit): Unit = {
val lis = new Fruit :: list
lis.foreach(println)
}
说明:
=:=
, <:<
实际上是两个在Predef.scala
定义好的两个类(sealed abstract class =:=[From, To] extends (From => To) with Serializable
与 sealed abstract class <:<[-From, +To] extends (From => To) with Serializable
),并在scala
编译器的协助之下完成类型约束的行为,该行为发生于代码编译期间(感慨scala
各种语法糖给编译器带来了大量的编译压力哈哈~~);
上述函数test0
, test2
中隐式参数的类型是scala
的中缀写法,原始写法如test1
所示=:=[A,B]
(带两个类型参数的类),当类有且仅有两个类型参数时候才能用中缀写法,更直观示例:def foo(f: Function1[String, Int])
可以替换为中缀形式def foo(f: String Function1 Int)
;
T <:< U
与T <: U
的异同点说明,二者都表示T
是U
的子类,但<:<
是更严格的类型约束, 要求在满足T
是U
子类的条件时不能对T
做类型推导&隐式转换的操作;而<:
则可以与类型推导&隐式转换配合使用;
下述代码在main
函数中用test3(1, List(new Apple))
调用def test3[A, B <: A](a: A, b: B)
时编译器正常编译通过,调用test3
时传入的第一个参数是Int
,第二个参数是List[Apple]
,显然不符合B <: A
的约束,为了满足这个约束编译器在做类型推导时会继续向上寻找父类型来匹配是否满足,于是第一个参数被推导为Any
类型,此时List[Int]
符合Any
的子类型,编译通过(用Java Decompiler
反编译字节码发现main
函数中调用test3
的入参类型为Int, List
,而这种入参类型不符合test3
编译后的入参要求B extends A
,即字节码合法性校验较编译器宽松许多,感觉需要读几本编译原理入门下,哈~);
def test3[A, B <: A](a: A, b: B): Unit = {
println(a + ", " + b)
}
// 调用函数
def main(args: Array[String]): Unit = {
test3(1, List(new Apple))
}
下述代码中在调用foo
时传入的Apple
,Orange
不能满足Apple <: Orange
,但由于隐式函数a2o
的存在,在编译main
函数中的第一行 foo(new Apple, new Orange)
时会引入隐式参数,将Apple
转换为Orange
,所以可成功编译并执行;而在编译第二行bar(new Apple, new Orange)
会报错error: Cannot prove that Apple <:< Orange
;
implicit def a2o(a:Apple) = new Orange
def foo[T, U<:T] (u:U, t:T) = print("OK")
def bar[T, U](u:U, t:T)(implicit f: U <:< T) = println("OK")
def main(args: Array[String]): Unit = {
// 编译成功,引入隐式函数`a2o`
foo(new Apple, new Orange)
// 编译失败,`<:<`是严格类型限定
// error: Cannot prove that Apple <:< Orange
bar(new Apple, new Orange)
}
scala
的协变
、逆变
、不变
的特性相比较其他更为的难理解,计划单独新开一篇文详细对比讲解;感兴趣的小伙伴们可以移驾:
https://itinycheng.github.io/2019/01/30/scala-vs-java’s-generics-2/
scala
类型参数体系是scala
语言的重要特性,相比Java
泛型,其更为复杂多变,且与编译器&隐式转换等互相掺杂配合,使得大家很难在短时间内掌握和灵活使用,建议大家多看scala
源码,多练习,多尝试,多用scala command
, Java Decompiler
等工具测试&反编译二进制文件,以求更深刻的了解scala
类型参数的语法特性&内在原理。
本文示例的代码存储在工程:https://github.com/itinycheng/jvm-lang-tutorial ,包com.tiny.lang.java.generic
和 com.tiny.lang.scala.generic
中。
近来在与两个技术朋友聊天时发现的一个简单有趣的问题,在这做个简短的记录;本文主要围绕问题是什么,在现有技术上下文中如何解决,有哪些优缺点。
问题预告:业务层面多个时区的数据如何映射到一张Druid表,查询时满足不同时区客户查询时都能按照各自时区的实际时间来统计数据;
假设我现在有一套统计分析的Saas
系统(简称:TA
),其包含两个模块,用户行为收集模块(简称:uTracker
)和行为数据统计分析模块(简称:uAnalyzer
)类似Google Analytics
或是Baidu统计
。
TA
系统的上下文说明:
TA
的客户将uTracker
引入到自己的App中,用以收集使用该APP的用户的行为数据;uTracker
随着APP的启动而启动并自动收集当前用户行为数据,比如:设备、上下文、访问内容等信息(简称:uData
),每条uData
都带有该行为数据生成的时间戳(简称:dTs
,从设备时钟获取)以及当前设备时钟对应的时区(简称:dZone
)。uTracker
将收集上来的数据进行合并、格式化、压缩等操作之后通过手机网络将数据发送到uAnalyzer
的前置数据收集系统;uAnalyzer
系统的清洗、补余,解析等等操作后入到统一的OLAP库中供给客户实时的查询他APP的新增、活跃、留存等指标数据;graph TB
A(用户A) -->|visit| UT(集成uTracker的APP)
B(用户B) -->|visit| UT
C(用户C) -->|visit| UT
D(......) -->|visit| UT
UT -->|send uData| UA(uAnalyzer的前置数据收集模块)
UA -->|pull uData| DP(uAnalyzer的数据处理模块, 以`Flink+Druid`为主)
graph LR
C(uAnalyzer解析数据) -->|sink| D(Druid: new_user)
D -->|query| E(uAnalyzer查询新增用户数服务)
TA
的客户群体说明
uTracker
,并使用我的Saas版TA
系统;uAnalyzer
系统中做统计分析;最后:TA
系统有10W+的客户在用,假设大多数的客户都有以上两个企业属性;
注: 客户A, B其实可类比为亚马逊、淘宝,他们分属不同国家,其用户群体涵盖的是全球范围;
问题描述:
TA
系统中的用户新增
指标提出问题;用户新增
指标定义:在客户查询的时间区间内该客户APP新增的用户数量;客户A是美国客户,其常用的时区为UTC-5;
客户B是中国客户,其常用时区为UTC+8;
客户A,B的用户新增
指标在同一张Druid
表(new_user)中;
Druid
表new_user
采用的时区是UTC+8,粒度是DAY
;
客户A的APP所对应的用户主要来自两个区域美国本地UTC-5,中国UTC+8; 客户B的APP所对应的用户同样来自两个区域美国本地UTC-5,中国UTC+8;
问题:对于一条美国当地上报的用户数据,若其uTs
在当地时间(UTC-5时区)是2018-11-13 12:00:00
则对应的UTC+8时间是2018-11-14 01:00:00
;其解析到Druid
表时是按照Druid
表设置的时区来做aggregate
的,当前Durid表时区配置是UTC+8
,则最终入库到Druid
时会将UTC-5时间2018-11-13 12:00:00
的数据解析到 到2018-11-14 00:00:00
的segment(Druid
表粒度是’DAY’),这就照成了在查询美国时间2018-11-13
日的新增用户数
实际是2018-11-12 11:00:00
到2018-11-13 13:00:00
的数据。
其实客户需求很简单,按照客户指定的时间区间和时区查询所有新增用户数量(即:全球范围内的新增用户数);
dataTs
:日志产生的时间,日志生成时的设备时间戳;dataZone
:接收到的数据中包含的时区,从设备时钟获取;druidZoneConf
:Druid表创建时所设置的时区;appZoneConf
:查询APP指标时期望用的时区;示例:假设某APP的用户分布在中国和美国,其上报数据的时区dataZone
中包含UTC+8, UTC-5两种时区,Druid表配置的时区druidZoneConf
是UTC+8,APP设置的查询所用时区appZoneConf
是UTC-5;
在处理每条原始日志时对dataTs
进行重新校准,校准好的newTimestamp
做为Druid入库的时间戳;
伪代码:
/**
* Zone(UTC+8) - Zone(UTC-5) = 8 - (-5) = 13
*/
def calibrate = (Zone a, Zone b) => a - b
val newTimestamp = dts + calibrate(appZoneConf, druidZoneConf)
优点:支持的所有的granularity(时间粒度:秒,分,时,日,周,月,年),代码变动量小,可沿用当前的所有Druid表;
缺点:历史数据没办法处理;APP在创建时必须选择时区且后续无法变更(已入库数据无法适配变更);
在数据解析并写入Druid的过程中不对时间戳&时区做干扰处理,在指标查询过程中针对时区进行查询区间的换算;
示例:Druid表的存储数据所用的时区是UTC+8,美国用户查看指标采用的时区是UTC-5,查询的时间区间是[2018-11-11, 2018-11-13],根据时区差异校准后的数据查询区间为[2018-11-11 13:00:00, 2018-11-13 13:00:00];(druid有针对时区的查询方案,但也要求<=小时
的粒度)
时间校准伪代码:
/**
* Zone(UTC+8) - Zone(UTC-5) = 8 - (-5) = 13
*/
def calibrate = (Zone a, Zone b) => a - b
val newStartTime = startTime + calibrate(appZoneConf, druidZoneConf)
val newEndTime = endTime + calibrate(appZoneConf, druidZoneConf)
注:Druid
本身的查询也支持时区的转换,可参照如下链接: http://druid.io/docs/latest/querying/granularities.html
优点:不修改原始数据的时间戳,不造成歧义容易理解;筛选数据所用时区可随意变更,无历史数据迁移问题;
缺点:只支持部分granularity(时间粒度:秒,分,时),其他粒度(天,月,周,年)不支持,需要对表进行重新的设计规划;
此外:一些国家使用非整数的时区,如:印度使用的是UTC +5:30,这时在使用方案二前提下无法去查询roll-up by HOUR
的表;
若在系统建设初期没考虑到跨时区等问题,到后期再更正就有存在比较大的工作量投入,且牵涉到历史数据处理迁移的问题;方案一是较为简单治标方案,比较适用于已有系统的变更;方案二则是一个比较能治标治本的方案,但需要在项目初期规划时候就考量进去,同时遇到数据量特别大的场景也需要将性能纳入考量,毕竟对同一个Druid Schema
来讲粒度从DAY
变为HOUR
数据最大增长24倍;