近期公司内部的Flink Job从Standalone迁移OnYarn时发现Job性能较之前有所降低,迁移前有8.3W+/S的数据消费速度,迁移到Yarn后分配同样的资源但消费速度降为7.8W+/S,且较之前的消费速度有轻微的抖动;经过简单的原因分析和测试验证得以最终解决,方案概述:在保持分配给Job的资源不变的情况下将总Container数量减半,每个Container持有的资源从1C2G 1Slot变更为2C4G 2Slot。本文写作的起因是经历该问题后发现深入理解Slot和Flink Runtime Graph是十分必要的;本文主要分为两个部分,第一部分详细的分析Flink Slot与Job运行关系,第二部详细的说下遇到的问题和解决方案。
Flink集群是由JobManager(JM), TaskManager(TM)两大组件组成的,每个JM/TM都是运行在一个独立的JVM进程中,JM相当于Master 是集群的管理节点,TM相当于Worker 是集群的工作节点,每个TM最少持有1个Slot,Slot是Flink执行Job时的最小资源分配单位,在Slot中运行着具体的Task任务。对TM而言:它占用着一定数量的CPU和Memory资源,具体可通过taskmanager.numberOfTaskSlots, taskmanager.heap.size来配置,实际上taskmanager.numberOfTaskSlots只是指定TM的Slot数量并不能隔离指定数量的CPU给TM使用,在不考虑Slot Sharing(下文详述)的情况下一个Slot内运行着一个SubTask(Task实现Runable,SubTask是一个执行Task的具体实例),所以官方建议taskmanager.numberOfTaskSlots配置的Slot数量和CPU相等或成比例;当然,我们可以借助Yarn等调度系统,用Flink On Yarn的模式来为Yarn Container分配指定数量的CPU资源以达到较严格的CPU隔离(Yarn采用Cgroup做基于时间片的资源调度,每个Container内运行着一个JM/TM实例);taskmanager.heap.size用来配置TM的Memory,如果一个TM有N个Slot,则每个Slot分配到的Memory大小为整个TM Memory的1/N,同一个TM内的Slots只有Memory隔离,CPU是共享的;对Job而言:一个Job所需的Slot数量大于等于Operator配置的最大Parallelism数,在保持所有Operator的slotSharingGroup一致的前提下Job所需的Slot数量与Job中Operator配置的最大Parallelism相等。
关于TM/Slot之间的关系可以参考如下从官方文档截取到的三张图:
图一: Flink On Yarn的Job提交过程,从图中我们可以了解到每个JM/TM实例都分属于不同的Yarn Container,且每个Container内只会有一个JM或TM实例;通过对Yarn的学习我们可以了解到每个Container都是一个独立的进程,一台物理机可以有多个Container存在(多个进程),每个Container都持有一定数量的CPU和Memory资源,而且是资源隔离的,进程间不共享,这就可以保证同一台机器上的多个TM之间是资源隔离的(Standalone模式下同一台机器下若有多个TM是做不到TM之间的CPU资源隔离)。
图二: Flink Job运行图,图中有两个TM,各自有3个Slot,2个Slot内有Task在执行,1个Slot空闲;若这两个TM在不同Container或容器上则其占用的CPU和Memory是相互隔离的;TM内多个Slot间是各自拥有 1/3 TM的Memory,共享TM的CPU,网络(Tcp:ZK, Akka, Netty服务等),心跳信息,Flink结构化的数据集等。
图三: Task Slot的内部结构图,Slot内运行着具体的Task,它是在线程中执行的Runable对象(每个虚线框代表一个线程),这些Task实例在源码中对应的类是org.apache.flink.runtime.taskmanager.Task;每个Task都是由一组Operators Chaining在一起的工作集合,Flink Job的执行过程可看作一张DAG图,Task是DAG图上的顶点(Vertex),顶点之间通过数据传递方式相互链接构成整个Job的Execution Graph。
Operator Chain是指将Job中的Operators按照一定策略(例如:single output operator可以chain在一起)链接起来并放置在一个Task线程中执行;Operator Chain默认开启,可通过StreamExecutionEnvironment.disableOperatorChaining()关闭,Flink Operator类似Storm中的Bolt,在Strom中上游Bolt到下游会经过网络上的数据传递,而Flink的Operator Chain将多个Operator链接到一起执行,减少了数据传递/线程切换等环节,降低系统开销的同时增加了资源利用率和Job性能;实际开发过程中需要开发者了解这些原理并能合理分配Memory和CPU给到每个Task线程。
注: 【一个需要注意的地方】Chained的Operators之间的数据传递默认需要经过数据的拷贝(例如:kryo.copy(…)),将上游Operator的输出序列化出一个新对象并传递给下游Operator,可以通过ExecutionConfig.enableObjectReuse()开启对象重用,这样就关闭了这层copy操作,可以减少对象序列化开销和GC压力等,具体源码可阅读org.apache.flink.streaming.runtime.tasks.OperatorChain与org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput;官方建议开发人员在完全了解reuse内部机制后才使用该功能,冒然使用可能会给程序带来bug。
Operator Chain效果可参考如下官方文档截图:
图四: 图的上半部分是StreamGraph视角,有Task类别无并行度,如图:Job Runtime时有三种类型的Task,分别是Source->Map, keyBy/window/apply, Sink,其中Source->Map是Source()和Map()chaining在一起的Task;图的下半部分是一个Job Runtime期的实际状态,Job最大的并行度为2,有5个SubTask(即5个执行线程);若没有Operator Chain,则Source()和Map()分属不同的Thread,Task线程数会增加到7,线程切换和数据传递开销等较之前有所增加,处理延迟和性能会较之前差。补充:在slotSharingGroup用默认或相同组名时,当前Job运行需2个Slot(与Job最大Parallelism相等)。
Slot Sharing是指来自同一个Job且拥有相同slotSharingGroup(默认:default)名称的不同Task的SubTask之间可以共享一个Slot,这使得一个Slot有机会持有Job的一整条Pipeline,这也是上文我们提到的在默认slotSharing的条件下Job启动所需的Slot数和Job中Operator的最大parallelism相等的原因;通过Slot Sharing机制可以更进一步提高Job运行性能,在Slot数不变的情况下增加了Operator可设置的最大的并行度,让类似window这种消耗资源的Task以最大的并行度分布在不同TM上,同时像map, filter这种较简单的操作也不会独占Slot资源,降低资源浪费的可能性。
具体Slot Sharing效果可参考如下官方文档截图:
图五: 图的左下角是一个soure-map-reduce模型的Job,source和map是4 parallelism,reduce是3 parallelism,总计11个SubTask;这个Job最大Parallelism是4,所以将这个Job发布到左侧上面的两个TM上时得到图右侧的运行图,一共占用四个Slot,有三个Slot拥有完整的source-map-reduce模型的Pipeline,如右侧图所示;注:map的结果会shuffle到reduce端,右侧图的箭头只是说Slot内数据Pipline,没画出Job的数据shuffle过程。
图六: 图中包含source-map[6 parallelism],keyBy/window/apply[6 parallelism],sink[1 parallelism]三种Task,总计占用了6个Slot;由左向右开始第一个slot内部运行着3个SubTask[3 Thread],持有Job的一条完整pipeline;剩下5个Slot内分别运行着2个SubTask[2 Thread],数据最终通过网络传递给Sink完成数据处理。
Flink在默认情况下有策略对Job进行Operator Chain 和 Slot Sharing的控制,比如:将并行度相同且连续的SingleOutputStreamOperator操作chain在一起(chain的条件较苛刻,不止单一输出这一条,具体可阅读org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(...)),Job的所有Task都采用名为default的slotSharingGroup做Slot Sharing。但在实际的需求场景中,我们可能会遇到需人为干预Job的Operator Chain 或 Slot Sharing策略的情况,本段就重点关注下用于改变默认Chain 和 Sharing策略的API。
StreamExecutionEnvironment.disableOperatorChaining(): 关闭整个Job的Operator Chain,每个Operator独自占有一个Task,如上图四所描述的Job,如果disableOperatorChaining则 source->map会拆开为source(), map()两种Task,Job实际的Task数会增加到7;这个设置会降低Job性能,在非生产环境的测试或profiling时可以借助以更好分析问题,实际生产过程中不建议使用。
someStream.filter(…).map(…).startNewChain().map(): startNewChain()是指从当前Operator[map]开始一个新的chain,即:两个map会chaining在一起而filter不会(因为startNewChain的存在使得第一次map与filter断开了chain)。
someStream.map(…).disableChaining(): disableChaining()是指当前Operator[map]禁用Operator Chain,即:Operator[map]会独自占用一个Task。
someStream.map(…).slotSharingGroup(“name”): 默认情况下所有Operator的slotGroup都为default,可以通过slotSharingGroup()进行自定义,Flink会将拥有相同slotGroup名称的Operators运行在相同Slot内,不同slotGroup名称的Operators运行在其他Slot内。
Operator Chain有三种策略ALWAYS, NEVER, HEAD,详细可查看org.apache.flink.streaming.api.operators.ChainingStrategy;startNewChain()对应的策略是ChainingStrategy.HEAD(StreamOperator的默认策略),disableChaining()对应的策略是ChainingStrategy.NEVER,ALWAYS是尽可能的将Operators chaining在一起; 在通常情况下ALWAYS是效率最高,很多Operator会将默认策略覆盖为ALWAYS,如filter, map, flatMap等函数。
JOB说明:
类似StreamETL,100 parallelism,即:一个流式的ETL Job,不包含window等操作,Job的并行度为100;
环境说明:
Standalone下的Job Execution Graph:10TMs * 10Slots-per-TM ,即:Job的Task运行在10个TM节点上,每个TM上占用10个Slot,每个Slot可用1C2G资源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100;
OnYarn下初始状态的Job Execution Graph:100TMs * 1Slot-per-TM,即:Job的Task运行在100个Container上,每个Container上的TM持有1个Slot,每个Container分配1C2G资源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100;
OnYarn下调整后的Job Execution Graph:50TMs * 2Slot-per-TM,即:Job的Task运行在50个Container上,每个Container上的TM持有2个Slot,每个Container分配2C4G资源,GCConfig:-XX:+UseG1GC -XX:MaxGCPauseMillis=100;
注: OnYarn下使用了与Standalone一致的GC配置,当前Job在Standalone或OnYarn环境中运行时,YGC, FGC频率基本相同,OnYarn下单个Container的堆内存较小使得单次GC耗时减少,生产环境中大家最好对比下CMS和G1,选择更好的GC策略,当前上下文中暂时认为GC对Job性能影响可忽略不计。
问题分析:
引起Job性能降低的原因不难定位,贴一张Container的线程图(VisualVM中的截图)图7:在一个1C2G的Container内有126个活跃线程,守护线程78个;首先,在一个1C2G的Container中运行着126个活跃线程,频繁的线程切换是会经常出现的,这让本来不就不充裕的CPU显得更加的匮乏;其次,真正与数据处理相关的线程是红色画笔圈起的14条线程(2条Kafka Partition Consumer,Consumers和Operators包含在这个两个线程内;12条Kafka Producer线程,将处理好的数据sink到Kafka Topic),这14条线程之外的大多数线程在相同TM不同Slot间可以共用,比如:ZK-Curator, Dubbo-Client, GC-Thread, Flink-Akka, Flink-Netty, Flink-Metrics等线程,完全可以通过增加TM下Slot数量达到多个SubTask共享的目的;此时我们会很自然的得出一个解决办法:在Job使用资源不变的情况下,减少Container数量的同时增加单个Container持有的CPU, Memory, Slot数量,比如上文环境说明中从方案2调整到方案3,实际调整后的Job运行稳定了许多且消费速度与Standalone基本持平。

注:
当前问题是公司内迁移类似StreamETL的Job时遇到的,解决方案简单不带有普适性,对于带有window算子的Job需要更仔细缜密的问题分析;
当前公司Deploy到Yarn集群的Job都配置了JMX/Prometheus两种监控,单个Container下Slot数量越多每次scrape的数据越多,实际生成环境中需观测是否会影响Job正常运行,在测试时将Container配置为3C6G 3Slot时发现一次java.lang.OutOfMemoryError: Direct buffer memory的异常,初步判断与Prometheus Client相关,可适当调整JVM的MaxDirectMemorySize来解决,异常如下图8:

Operator Chain是将多个Operator链接在一起放置在一个Task中,只针对Operator;Slot Sharing是在一个Slot中执行多个Task,针对的是Operator Chain之后的Task;这两种优化的都充分利用了计算资源,减少了不必要的开销,提升了Job的运行性能。此外,Operator Chain的源码在streaming包下,只在流处理任务中有这个机制;Slot Sharing在flink-runtime包下,似乎应用更广泛一些(具体没考究)。
最后,只有充分的了解Slot,Operator Chain,Slot Sharing是什么以及各自的作用和相互间的关系,才能编写出优秀的代码并高效的运行在集群上。
结尾:贴一张官方讲解Slot的详图供大家参考学习:
Flink是由公司前辈引入,最初版本是Flink 1.1.5,后升级到Flink 1.3.2一直以来Flink Job都是跑在Standalone集群上的,为规避Standalone集群中Job相互影响的问题(Standalone无法做资源隔离),前辈们又新做了Flink集群,将互相影响的Job拆分到不同集群中,这时团队内部有多个Flink Standalone集群,虽然解决了部分Job互相影响的问题,但有些服务是和Kafka,Druid部署在同一台物理机器的(不同进程),又出现过几次不同服务互相影响的情况;这些Flink Standalone集群维护/升级起来也挺麻烦的,同时新Job越来越多,生活苦不堪言~.~;兄弟们从2018年初就喊着OnYarn,结果由于人力、机器、业务迭代等各种问题一拖再拖;到2018年底终于开始了这项工作,虽然只1个人力投入,经历2-3个月的时间团队的streaming Job已经稳定运行在OnYarn模式下,本文就重点记录下这段时所遇到的一些重点问题,当前生产运行版本是Flink 1.7.1 + Hadoop 3.1.0
由于官方没有依赖Hadoop 3.1.0的Flink下载包,所以这步需要自行编译Flink源码,可参考官方Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html;建议用公司内部环境(Jenkins, JDK, etc.)并结合官方Doc来完成打包。
简单列下我司打包的Jenkins配置:
Branches to build: refs/tags/release-1.7.1
Repository URL: git@gitlab.company.com:git/flink.git
Build - Goals and options: clean install -DskipTests -Dfast -Dhadoop.version=3.1.0
Post Steps - Execute shell: cd flink-dist/target/flink-1.7.1-bin
tar -czvf flink-1.7.1-hadoop31-scala_2.11.tar.gz flink-1.7.1/
报错:
使用./bin/flink run -m yarn-cluster的方式提交Job时出现如下报错:
client端:
------------------------------------------------------------
The program finished with the following exception:
java.lang.RuntimeException: Error deploying the YARN cluster
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:556)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:72)
at org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:962)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:243)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.RuntimeException: Couldn`t deploy Yarn cluster
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:443)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:554)
... 12 more
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
server端:
Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: newWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function
at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:295)
at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:251)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
原因:
Flink 1.3.2依赖的Netty3的版本较低,与Hadoop 3.1.0不兼容导致的报错;不兼容的Netty版本为:Netty-3.4.0.Final 和Netty-3.10.6.Final;曾尝试通过修改NioWorkerPool的源文件并编译新的class文件以解决这个问题,但以失败告终;
解决:
升级Flink版本;经测试:Flink 1.6, 1.7 + Hadoop 3.1.0编译出的包是可以正常提交给Hadoop 3.1.0集群的;
结论:
Flink 1.3.2与Hadoop 3.1.0存在兼容性问题,若需提交Flink Job到Hadoop 3.1.0及以上,最好使用Flink 1.6+
Flink类型系统和序列化方式可关注官方doc:
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html
Flink默认支持PoJoSerializer,AvroSerializer,KryoSerializer三种序列化,官方推荐使用Flink自己实现的PojoSerializer序列化(在遇到无法用PoJoSerializer转化的时默认用KryoSerializer替代,具体可关注GenericTypeInfo),Kryo性能较Avro好些,但无供跨语言的解决方案且序列化结果的自解释性较Avro差;经过功能性能等多方面的调研,计划Task间数据传递用Kryo,提高数据序列化性能,状态的checkpoint,savepoint用Avro,让状态有更好的兼容&可迁移性;此功能从Flink 1.7+开始支持具体关注doc:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html
OK,接下来说遇到的在Avro下的对象序列化异常,本次bug和上边的内容无关~,~. 这个报错只是因为Flink-Avro不支持null的序列化;
报错:
开启Avro序列化(enableForceAvro()),Task间通过网络传递数据对象时,若对象内部有field为null,数据无法正常序列化抛出如下异常:
java.lang.RuntimeException: in com.app.flink.model.Line in string null of string in field switchKey of com.app.flink.model.Line
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
... more
Caused by: java.lang.NullPointerException: in com.app.flink.model.Line in string null of string in field switchKey of com.app.flink.model.Line
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(AvroSerializer.java:135)
... 18 more
Caused by: java.lang.NullPointerException
at org.apache.avro.specific.SpecificDatumWriter.writeString(SpecificDatumWriter.java:65)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:76)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
原因:
Flink在初始化Avro Serializer时使用org.apache.avro.reflect.ReflectData构造DatumReader和DatumWriter,ReflectData本身不支持null的序列化(Avro有提供ReflectData的扩展类org.apache.avro.reflect.AllowNull来支持null的序列化,但Flink并未使用),从而导致null field的序列化异常;而Kryo默认支持null field,程序可正常运行;Flink 1.3.2下代码截图如下:
Avro:

Kryo:

Flink 1.7后对集成Avro的代码变更较大,但依旧会出现当前Bug,代码逻辑入口:

解决:
field值赋默认值;Kryo(enableForceKryo())序列化;enableForceXXX()尝试下默认的PoJoSerializer;本问题在Kafka0.8 + Flink 1.7(cannot guarantee End-to-End exactly once)环境下造成了数据丢失的问题;
报错:
异常一:kafka异常(导致了Job重启)
Caused by: java.lang.Exception: Could not complete snapshot 69 for operator wep
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
... 8 more
Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
... 13 more
异常二:Job重启,Dubbo异常
com.alibaba.dubbo.rpc.RpcException: Rpc cluster invoker for interface com.etl.commons.api.GenericService on consumer xxx.xx.xx.xx use dubbo version flink is now destroyed! Can not invoke any more.
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.checkWheatherDestoried(AbstractClusterInvoker.java:233)
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:215)
at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
at com.alibaba.dubbo.common.bytecode.proxy0.queryBaseDimension(proxy0.java)
过程:
Kafka抖动或异常导致Flink sink data to Kafka时丢失Topic Leader(出现异常一),进而导致Flink Job重启;
Flink Job重启正常,Job状态是running,但flink operator中的Dubbo未初始化导致无法提供服务(在Flink Job重启过程中由于DubboService.java中代码不规范而导致Dubbo服务未初始化,Dubbo服务不可用);
Flink Source开始消费数据,当需调用含有DubboService的Operator处理数据时抛异常(出现异常二),数据无法正常处理并下发到下一个ChainOperator;
上游数据不断被消费,但无法正常处理并下发,这就照成的Source端丢数的现象;
原因:
在Standalone模式下Flink Job的失败自动重启Dubbo服务可以正常初始化的,但在OnYarn模式下初始化Dubbo服务会不可用;
Flink Standalone下失败重启过程中,DubboService会先卸载再重新加载,Dubbo服务正常初始化;
OnYarn下失败重启的过程中类DubboService不会被JVM卸载,同时代码书写不规范,导致内存驻留了已被closed的DubboService;
调用已经closed的Dubbo服务则数据无法正常处理并抛异常;
问题代码:
private static DubboService dubboService;
public static void init() {
if (dubboService == null) {
Dubbo.init();
logger.info("Dubbo initialized.");
} else {
logger.info("Dubbo already existed.");
}
}
// 未对dubboService做置空处理
public static void destroy() {
dubboService.destroy();
}
解决:
在destory方法中清除所有static的共享变量,使得job重启时候可以正常重新初始化Dubbo服务,如下代码:
修改代码:
public static void destroy() {
dubboService.destroy();
// 置空
dubboService = null;
}
补充:
OnYarn的Flink Job重启时候是不会释放Container并重新申请的,自动重启前后的JobId保持不变;
Standalone模式下,User code的是在TaskManager环境中加载启动的用的ClassLoader是:org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
OnYarn模式下(bin/flink run -m yarn-cluster),User Code和Flink Framework一并提交到Container中,User Code加载用的ClassLoader是:sun.misc.Launcher$AppClassLoader;
由于ClassLoader不同,导致User Code卸载时机不同,也就导致在OnYarn模式下Restart Job,DubboService初始化异常;
可参考Flink Docs:https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/debugging_classloading.html
注: coding时候尽量用RichFunction 的open()/close()方法初始化对象,尽可能用(单例)对象持有各种资源,减少使用static(生命周期不可控),及时对static修饰对象手动置空;
解决该问题需要对Flink的Task数据交换&内存管理有深刻的理解,可参考文章: https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
环境:
Job的运行Topology图:
graph LR
A(Task-Front `parallelism:30`) -->|shuffle| D(Task-End `parallelism:90`)
Task-Front负责从Kafka读取数据、清洗、校验、格式化,其并行度为30;
Task-End负责数据的解析&处理,并发送到不同的Kafka Topic;
Task-Front并行度为30,Task-End并行度为90;
Task-Front通过网络IO将数据传递给Task-End;
Job压测环境:
Flink 1.3.2的Standalone模式;
2个JobManager高可用;
8个TaskManager(一台物理机一个TaskManager),主要配置如下:
taskmanager.numberOfTaskSlots: 13
taskmanager.network.numberOfBuffers: 10240
taskmanager.memory.segment-size: 32768 # 32K
taskmanager.memory.size: 15g
问题:
在Standalone下由于taskmanager.network.numberOfBuffers配置数量过小导致data exchange buffer不够用,消费Kafka数据的速度减缓并出现大幅度的抖动,严重情况下导致FlinkKafkaConsumer停止消费数据,从JMX监控看到如下现象:
:

分析:
从JMX监控看,主要是因为Buffer原因导致的线程wait,cpu抖动;
共8台机器,每台机器上有13个Slot,根据Job是30 + 90的并行度来看,每台机器上同时存在多个Task-Front和Task-End;
Task-Front需将处理好的数据存储到ResultPartition;
Task-End需将接收到的数据存储到InputGate以备后续处理;
Task-End将处理好的数据直接发送给Kafka Topic,所以不需要ResultPartition;
一个Task-Front需将自己处理的数据shuffle到90个Task-End;
通过Flink Buffer分配原理可知,需要30*90个ResultPartition,90*30个InputGate;
若平均分则每个ResultPartition/InputGate可分配到的Buffer数量是:
(10240 * 8) / (30*90 + 90 * 30) ≈ 15个(每个32k);
在8w+/s, 5~10k/条的数据量下的15个Buffer会在很短时间被填满,造成Buffer紧缺,Task-Front无法尽快将数据shuffle到下游,Task-End无法获取足够的数据来处理;
解决:
增大taskmanager.network.numberOfBuffers数量,最好保证每个ResultPartition/InputGate分配到90+(压测预估值,不保证效果最佳)的Buffer数量;
补充:
在OnYarn下几乎不会遇到该为题,了解该问题有助于理解Flink内存管理&数据传递;
Flink 1.5之后Buffer默认用堆外内存,并deprecated了taskmanager.network.numberOfBuffers,用taskmanager.network.memory.max与taskmanager.network.memory.min代替;
关于Buffer分配&动态调整等逻辑可关注以下几个类:TaskManagerRunner / Task / NetworkEnvironment / LocalBufferPool / InputGate / ResultPartition / MemorySegment,在此不做源码解读,最后贴两个截图:

Standalone与OnYarn下的日志配置和输出路径有较大的区别,Standalone下直接在maven的resources目录下配置log4j2.xml/log4j.properties即可完成日志配置,但在OnYarn下 需要通过修改${flink.dir}/conf/log4j.properties的配置来定义日志输出,这些配置在container.sh启动TaskManager/JobManager时被加载以初始化log4j,配置如下:
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
# ${log.file} input by luanch_container.sh
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.encoding=UTF-8
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
${flink.dir}/conf下不同log配置文件所适用的范围如下图:

${flink.dir}中默认不包含DailyRollingFileAppender的依赖,所以在使用DailyRollingFileAppender时还需要添加依赖apache-log4j-extras-1.2.17.jar到${flink.dir}/lib/目录,Flink Job的日志会输出到Yarn Container的日志目录下(由yarn-site.xml中的yarn.nodemanager.log-dirs指定),Container的启动脚本(可在YarnUI log中找到)示例:
// log.file 即日志输出目录
exec /bin/bash -c "$JAVA_HOME/bin/java -Xms1394m -Xmx1394m -XX:MaxDirectMemorySize=654m -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -Dlog.file=/home/hadoop/apache/hadoop/latest/logs/userlogs/application_1547994202702_0027/container_e02_1547994202702_0027_01_000002/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /home/hadoop/apache/hadoop/latest/logs/userlogs/application_1547994202702_0027/container_e02_1547994202702_0027_01_000002/taskmanager.out 2> /home/hadoop/apache/hadoop/latest/logs/userlogs/application_1547994202702_0027/container_e02_1547994202702_0027_01_000002/taskmanager.err"
运行中的FlinkOnYarn Job日志查看:
直接通过YarnUI界面查看每个Container的日志输出,如下图:

在container所在节点的对应目录下通过tail, less等shell命令查看日志文件,如下图:

在hadoop配置enable log aggregation时,可以通过yarn logs -applicationId ${application_id}获取log;
已完成的FlinkOnYarn Job日志查看:
在配置hadoop log aggregation时,可以通过yarn logs -applicationId ${application_id}获取log;
注: Spark中HistoryServer支持用户从YarnUI中查看已完成Job的日志等,但Flink中的HistoryServer在OnYarn下不可用,留下问题后续解决;
FlinkUI的指标监控使用起来不够方便,同时在Job比较大时往FlinkUI上添加监控指标时卡顿现象非常明显,需要选择一个更好的Job监控工具来完成Job的监控任务,Prometheus是一个很好的选择;具体说明可参考官方Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html;
Flink从1.4版本开始支持Prometheus监控PrometheusReporter,在Flink1.6时有加入了PrometheusPushGatewayReporter的功能;
PrometheusReporter是Prometheus被动通过Http请求Flink集群以获取指标数据;
PrometheusPushGatewayReporter是Flink主动将指标推送到Gateway,Prometheus从Gateway获取指标数据;
OnYarn下Flink端开启Prometheus监控的配置:
// Flink可以同时定义多个reportor,本示例定义了jmx和prometheus两种reportor
// JMX用RMI,Prom用Http
metrics.reporters: my_jmx_reporter,prom
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9045
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9275
当Job分配到Yarn Container启动JobManager/TaskManager的过程中会实例化metrics.reporter.xxx.class配置的类,启动监控服务;
PrometheusReporter是通过暴漏Http地址来提供监控指标访问功能的,在每个JobManager/TaskManager启动的过程中都会实例化一个PrometheusReporter对象,并启动HttpServer服务绑定到一个端口上
(metrics.reporter.prom.port配置的区间从小到大第一个未占用的端口);因此metrics.reporter.prom.port配置的区间可允许的最大端口数必须大于等于一台机器最大可启动的Container数量,
否则会有监控服务启动失败的报错,如下:
报错:
2019-03-12 20:44:39,623 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring prom with {port=9250, class=org.apache.flink.metrics.prometheus.PrometheusReporter}.
2019-03-12 20:44:39,627 ERROR org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate metrics reporter prom. Metrics might not be exposed/reported.
java.lang.RuntimeException: Could not start PrometheusReporter HTTP server on any configured port. Ports: 9250
at org.apache.flink.metrics.prometheus.PrometheusReporter.open(PrometheusReporter.java:70)
at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:153)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:137)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:330)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:301)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:298)
本文简单总结了Flink使用过程中遇到的几个重要的问题,taskmanager.network.numberOfBuffers配置的问题有在stackoverflow, FlinkCommunity问过,但没得到较为准确的答案,最终通过gg文档&源码才逐渐理解,源码&Google的重要性不言而喻!
第一次听说协变&逆变是刚接触Scala的时候,但协变&逆变却不是Scala所特有的,Java, C#等语言也有协变&逆变的概念,本文首先会解释协变&逆变这两个术语的含义,然后进一步探讨它们在Scala中具体的书写语法和使用场景。
协变&逆变的概念入门可以关注下边这两个篇文章,本段内容也是学习这两篇文章所得到的:Covariance and Contravariance of Hosts and Visitors, Cheat Codes for Contravariance and Covariance。
实体:Fuel, Plant, Bamboo;图中的符号>表示可替代性(类比多态:子类可作为父类使用),Plant可以做为这一种Fuel(燃料),Bamboo是一种Plant,在作为燃料的这件事情上也可以认为 Bamboo extends Plant, Plant extends Fuel;Box:用于实体(Fuel, Plant, Bamboo)的存放;

消费者:Burn, Animal, Panda;
/** box contains entities */
class Box<T> {
/**
* 若有一个Box<Plant>,可以用BoxVisitor<Fuel,_>, BoxVisitor<Plant,_>来消费他,不能用BoxVisitor<Bamboo,_>来消费;
* 对于BoxVisitor来讲T是逆变的,只能接收类型为T或T父类的BoxVisitor,即BoxVisitor<? super T>;
*
*/
public <R> R accept(BoxVisitor<? super T, ? extends R> visitor) {
return null;
}
}
/** consume entities in box */
class BoxVisitor<U, R> {
public R visit(Box<? extends U> box) {}
}
class Burn<R> extends BoxVisitor<Fuel, R> {
/**
* 对于Burn而言,它可消费Fuel,Plant,Bamboo,所以visit可接收任何有Fuel属性的Box;
* 由此Box<T>可以被认为是`协变`的,即如果T extends U 则 Box<T> extends Box<U>
*/
public R visit(Box<? extends Fuel> box) {}
}
Burn可以燃烧一切包括:Fuel, ` Plant, Bamboo;Animal可以吃所有类型的植物包括:Plant, Bamboo;Panda只吃一种植物:Bamboo`。由此,可得出下面这关系张图:

如果消费者是Burn,我可以把Box<Fuel>, Box<Plant>, Box<Bamboo>传递给它;如果消费者是Animal,我可以把Box<Plant>, Box<Bamboo>传递给它;如果消费者是Panda,我可以把Box<Bamboo>传递给它。
对于Box<T>而言,它存储的实体对象用于被使用/消费,所以它是一个生产者,若能将Box<Fuel>传给某个BoxVisitor,则也能将Box<Plant>, Box<Bamboo>传递给这个BoxVisitor,这时Box<Fuel>中的泛型是上界(upper bound),需要用extends关键字来定义Box<T>的范围,该范围能被上文中的BoxVisitor消费。即,若BoxVisitor能接收装满T的Box,则肯定能接收装满T子类的Box(Box<T>是生产者,remember: Prdoucer Extends),Box<T>随着它的泛型T进行协变,协变的简单定义:
如果:
graph LR
A(Class A) -->|extends| B(Class B)
则:
graph LR
A(Box < A >) -->|extends| B(Box < B >)
注::Java中的数组是协变的,如下列Java代码可正常编译,但运行时会抛出ArrayStoreException:
Box.Bamboo[] bamboos = new Box.Bamboo[1];
bamboos[0] = new Box.Bamboo();
Stream.of(bamboos).forEach(System.out::println);
Box.Plant[] plants = bamboos;
plants[0] = new Box.Plant();
// throw ArrayStoreException at running stage.
Stream.of(plants).forEach(System.out::println);
Fuel的Box,传递给Burn(汽油只可以燃烧);可以将装满Plant的Box传递给Burn, Animal(植物可以燃烧&吃);可以将装满Bamboo的Box传递给Burn, Animal, Panda(竹子可以燃烧,可以给动物&熊猫吃)。由此我们可以得出下面的这张图:

如果我有Box<Fuel>,我可以把它传递消费者Burn;如果我有Box<Plant>,我可以把它传递给消费者Animal, Burn;如果我有Box<Bamboo>,我可以把它传递给消费者Panda, Animal, Burn。
对于BoxVisitor<T>本身而言,它是一个可消费<T>的消费者,若一个Box<Bamboo>能被Panda(即: BoxVisitor<Bamboo>)消费,则其必然也能被Animal(即: BoxVisitor<Plant>), Burn(即: BoxVisitor<Fuel>)消费,BoxVisitor<Bamboo>中的泛型Bamboo是BoxVisitor的下界(lower bound),需用supper来定义BoxVisitor<T>中T的取值范围,这个范围内的BoxVisitor都可用于消费Box<T>(BoxVisitor<T>是消费者,remember: Consumer Super),BoxVisitor<T>是逆变的,逆变简单定义:
如果:
graph LR
A(Class A) -->|extends| B(Class B)
则:
BoxVisitor<Parent> extends BoxVisitor<Child>
graph LR
B(BoxVisitor < B >) -->|extends| A(BoxVisitor < A >)
协变&逆变在Java中并没有上升到语法层面,而Scala在语法层面对协变&逆变进行的详细的语法设计,Scala中用类型参数[T]表示不变,[+T]表示协变,[-T]表示逆变,具体Scala中协变&逆变的语法特性&约束会在下文进行详细的分析讲解。
定义若干类结构:
class Fuel
class Plant extends Fuel
class Bamboo extends Plant
// 声明类型`Box`,其类型参数`T`是协变的
class Box[+T](ts:T*)
// 类型参数逆变
class BoxVisitor[-T]
一个类可被定义为协变的前提是:这个类型类似Producer Extends,并且是只读的(read-only),若非只读则会出现如下问题场景(ArrayStoreException异常):
// 假设Array是协变的
// 创建一个Circle数组并装入10个Circle对象
val circles: Array[Circle] = Array.fill(10)(Circle(..))
// 当Aarry[T]是协变时该行代码有效
val shapes: Array[Shape] = circles
// 将数组中第一个元素修改为Square对象(Square is a subtype of Shape)
// 若协变非read-only,此处编译通过,但运行会抛异常(向Array[Circle]添加Square对象,这是不被允许的)
shapes(0) = Square(..)
子类对象赋值给父类变量
因为Bamboo <: Plant <: Fuel而且Box[+T]是协变的,所以Box[Bamboo] <: Box[Plant] <: Box[Fuel],同时由多态原则:子类可看做父类,最终得出下边有效代码:
val bamboos: Box[Bamboo] = new Box[Bamboo](new Bamboo())
// 根据协变特性,下列代码可正常运行
val plants: Box[Plant] = bamboos
val fuels: Box[Fuel] = bamboos
协变类型内部函数限定
/**
* covariance:readonly, non-writable
* 即:入参:限定不能将A & A的子类传递给Foo
* 返回值:无限制
*/
trait Foo[+T] {
// def fun[①](②)(op: ③): ④
def fun0[U >: T](u: U)(op: T => U): U
def fun1[U >: T](u: U)(op: T => U): U => T
}
定义的协变类Foo[+T],类内部定义函数的通式包含着组成函数的所有元素:①:类型参数定义,②:对象类型入参定义,③:函数类型的入参定义,④:返回值/函数定义,在协变类里每个位置的条件限制如下:
①类型参数:不能定义子类类型[U]([U <: T]),原因:Covariant type T occurs in contravariant position in type T of value U;
②对象入参:入参的类型不能为T & subType of T,原因:Covariant type T occurs in contravariant position in type T of value u;
③函数入参:函数的出参不能为T & subType of T,原因:Covariant type T occurs in contravariant position in type U => T of value op;
④返回值:无限制;
④返回函数: 函数入参不能为T & subType of T,原因:Covariant type T occurs in contravariant position in type T => T of value fun;
In A Picture:
逆变位置:只允许定义或传入super T的类型,或是与T无父子关系的类型;
协变位置:对类型无限制,可任意定义;
个人理解:协变可看做生产者Producer Extends并且 readonly;所以协变类内部不能存在add(_ extends T)的函数,对函数的返回值则无限制,所以就有了上图中的语法约束;
逆变是与协变相对的一个概念,可以将逆变看做消费者Comsumer super,并且是write-only的,若非write-only会出现如下问题场景:
// 假设Array是逆变的
// 首先创建数组并装入Shape对象
val shapes: Array[Shape] = Array.fill(10)(Shape(..), Shape(..))
// Works only if Array is contravariant
val circles: Array[Circle] = shapes
// 编译异常,circles(0)实际是Shape对象,是Circle的父类
val circle: Circle = circles(0)
父类对象赋值给子类变量
// 由于BoxVisitor是逆变的,所以下边代码可正常编译运行
val fuelVisitor: BoxVisitor[Fuel] = new BoxVisitor[Fuel]
val plantVisitor: BoxVisitor[Plant] = fuelVisitor
val bambooVisitor: BoxVisitor[Bamboo] = fuelVisitor
// 作为消费者,能消费类型`T`则必定能消费T的父类,但不一定能消费子类,
// 因为子类在拥有所有父类可被外部访问的变量和方法的同时扩展或覆盖了父类的变量和方法
逆变类型的内部函数限定
/**
* contravariance:write-only, non-readable
* consumer super
*/
trait Foo[-T] {
// def fun[①](②)(op: ③): ④
def fun0[U <: T](u: U)(op: U => T): U
def fun1[U <: T](u: U)(op: U => T): T => U
}
在逆变类Foo[-T]中函数通式(def fun[①](②)(op: ③): ④)的各位置的条件限制如下:
①类型参数:不能定义父类类型[U]([U >: T]),原因:Contravariant type T occurs in covariant position in type T of value U;
②对象入参:无限制;
③函数入参:函数的入参不能为T & superType of T,原因:Contravariant type T occurs in covariant position in type T => U of value op;函数出参无限制;
④返回值:不能为T & superType of T,原因:Contravariant type T occurs in covariant position in type T of value fun;
④返回函数: 函数出参不能为T & superType of T,原因:Contravariant type T occurs in covariant position in type U => T of value fun1;
In A Picture:
协变位置:只允许定义或传入extends T的类型;
逆变位置:对类型无限制,可任意定义;
个人理解:逆变可看做消费者Consumer super并且 writeonly;所以协变类内部不能存在return (_ extends T)的函数,对函数的入参则无限制,所以就有了上图中的语法约束;
不变就是平时常见的类型参数,比如ListBuffer[T], ArrayBuffer[T]等都是不变的,可以对它们进行 read & write等操作,只要满足严格的类型约束要求即可;
scala中常见的协变/逆变类有:List[+T], Seq[+A], Option[+A], Future[+T], Tuple[..], Map[A, +B], Function1[-T1, +R], CanBuildFrom[-From, -Elem, +To], etc.
Function1[-T1, +R]的另一种常见写法-T1 => +R(入参逆变,出参协变)
示例
def main(args: Array[String]): Unit = {
// 协变
val list = List[String]("A", "B", "CC")
println(toInt(list))
// 协变&逆变组合
val fun = (v1: CharSequence) => v1.toString.map(_.toInt)
println(toInt(fun, "ABC"))
}
// 协变,入参类型允许的范围`List[_ <: CharSequence]`
def toInt(list: List[CharSequence]): Seq[Int] = list.map(_.length)
// 协变&逆变组合使用,第一个入参类型允许的范围`Function1[_>:String, _<:Seq[Int]]`
// 第一个入参类型范围的另一种表示:`_ >: String => _ <:Seq[Int]`
def toInt(fun: String => Seq[Int], in: String): Seq[Int] = fun(in)
本文主要对协变,逆变进行了详细的语法讲解&场景演示,但这些只是开始,大家需在平时工作中多结合应用场景多练习才能达到灵活运用的目的;此外协变,逆变只在scala编译期进行语法校验,不影响runtime,编译出的字节码会被部分裁切以满足JVM字节码规范。
本文示例的代码存储在工程:https://github.com/itinycheng/jvm-lang-tutorial ,包com.tiny.lang.java.generic.variance 和 com.tiny.lang.scala.generic.variance中。
Scala类型参数与Java的泛型体系是较相似的语言特性,但Scala类型参数体系的实现比Java更为的多样和复杂;Scala类型参数体系包含如下:[多重]上界(T <: UpperBound)、[多重]下界(T >: LowerBound)、[多重]视图界定(T <% ViewBound)、[多重]上下文界定(T : ContextBound)、类型约束(T =:= U, T <:< U, T <%< U)、协变(+T)、逆变(-T);可以认为Scala类型参数在覆盖Java泛型所有特性的基础上又做了更多的扩展延伸,即Java泛型是Scala类型参数的一个子集。本文主要讲解Scala参数类型的各种特性并辅助与Java泛型做对比分析,是一个供给Java Developer学习用的Scala类型参数入门指南。
首先定义若干类结构
trait Food
trait Medicine
class Fruit extends Food
class Apple extends Fruit
// 橘子是一种水果,橘子皮有药用价值
class Orange extends Fruit with Medicine
T <: upperBound 类型上界T >: lowerBound 类型下界T >: lowerBound <: upperBound 多重界定with 连接多个上界或多个下界用符号<:来表示类型参数的上界,如_ <: Food 表示任意类型_是Food的子类,默认情况下Scala类的上界是Any(类似Java中的Object),即任意类型参数可表示为_ <: Any。
scala支持同时声明多个上界,两个上界类型之前用with连接,例如:_ <: Fruit with Medicine给任意类型_定义了两个上界:Fruit, Medicine,在上文定义的类中只有Orange满足这个多重上界的限定。with可以多次使用,用以连接多个上界类型,语法如下:_ <: TypeA with TypeB with TypeC with ...。
用符号>:来表示类型参数的下界,如:_>:Fruit表示任意类型_需满足是Fruit的父类的上界限定,scala也支持多重上界的语法,如:_ >: Fruit with Food表示任意类型_必须同时满足是Fruit和Food的父类,上文定义类中无满足该要求的类,AnyRef可以满足这个多重上界的要求(scala中AnyRef是任意引用类型的父类)。
上界和下界一起使用可被称为多重界定,即一个类型参数时有上界和下界,如:_ >: Apple <: Food表示任意类型_需满足以Apple作为下界,同时以Food作为上界,上文定义的类中Food满足该限定;此外,一个类型参数也可以同时拥有多上界和多个下界,如:_ >: Apple with Food <: AnyRef with Serializable定义了两个下界Apple和Food,以及两个上界AnyRef和Serializable。
/**
* 函数接收一个ListBuffer类型的入参,ListBuffer存放的对象必须是Fruit子类
*/
def test0(list: ListBuffer[_ <: Fruit]): Unit = {
// compile error: list ++= List(new Fruit,new Orange,new Apple)
list.foreach(println)
}
/**
* define a generic type `T` which has an upper bounds `Fruit`
*/
def test1[T <: Fruit](list: ListBuffer[T]): Unit = {
// compile error :list ++= List(new Fruit,new Orange,new Apple)
list.foreach(println)
}
/**
* 函数接收一个ListBuffer类型入参,ListBuffer存放对象必须是Fruit父类
*/
def test2(list: ListBuffer[_ >: Fruit]): Unit = {
list ++= List(new Apple, new Fruit)
list.foreach(println)
}
/**
* @see [[test2]]
*/
def test3[T >: Fruit](list: ListBuffer[T]): Unit = {
list ++= List(new Apple, new Fruit)
list.foreach(println)
}
/**
* multi upper bounds
*/
def test4[T <: Fruit with Medicine](list: ListBuffer[T]): Unit = {
//compile error: list ++= List(new Orange)
list.foreach(println)
}
/**
* multi lower bounds
*/
def test5[T >: Fruit with Medicine](list: ListBuffer[T]): Unit = {
list ++= List(new Orange)
list.foreach(println)
}
/**
* have upper & lower bounds at the same time
*/
def test6[T >: Apple <: Food](list: ListBuffer[T]): Unit = {
// compile error: list += new Fruit
list.foreach(println)
}
/**
* multi upper & lower bounds
*/
def test7[T >: Apple with Food <: AutoCloseable with Serializable](list: ListBuffer[T]): Unit = {
// compile error: list ++= List(new Orange, new Apple)
list.foreach(println)
}
实际上: 上界对应的是Java中的extends关键字,下界对应Java中的super关键字,scala的上下界语法特性比Java的语法表现更全面;在Java中无法定义一个泛型<T super Fruit>(可能在Java看来super of every class is Object);但Java支持多重的上界,比如定义一个泛型T必须继承两个类<T extends Fruit & Medicine>,其中&与scala中的with关键字相对。将scala源码编译成字节码,然后反编译字节码进行观察发现之前定义的带有上下界的类型参数只剩下extends关键字(对应<:),而super关键字(对应>:)基本被擦除(Be Erased),从这一层面来讲,上下界是scala特有的语法层特性,是编译时特性非运行时,scala源码编译成字节码时会进行语法表达的转换和裁切,以符合JVM的字节码规范。
T <% viewBound 视图界定(deprecated from scala 2.11)用符号<%来表示视图界定,T <% Juice表示在当前.scala文件的上下文中,存在一个隐式函数可以将类型T转换为Juice。
scala语法也支持多重的视图界定,如:T <% Juice <% Soup表示类型T既可以隐式转换成Juice也可以转换成Soup。
注: 视图界定在Scala 2.11版本已经deprecated,请用隐式参数替换,如下代码示例中的test2, test3;
class Juice(food: Fruit){
def juice = "Juice"
}
class Soup(food: Fruit){
def soup = "Soup"
}
/**
* 水果可以榨汁
*/
implicit def fruitToJuice(fruit: Fruit): Juice = {
new Juice(fruit)
}
/**
* 水果可以做汤
*/
implicit def fruitToSoup(fruit: Fruit): Soup = {
new Soup(fruit)
}
// 引入隐式转换函数
import fruitToJuice, fruitToSoup
/**
* 该函数要求传入的ListBuffer中的T的对象可以隐式转换成Juice
* view bounds are deprecated,use implicit parameter instead.
* input parameter `T` can convert to `Juice`
*/
def test0[T <% Juice](list: ListBuffer[T]): Unit = list.foreach(i => println(i.juice))
/**
* multi view bound
*/
def test1[T <% Juice <% Soup](list: ListBuffer[T]): Unit = {
// 通过隐式转换成Juice而拥有juice函数
list.foreach(i => println(i.juice))
// 通过隐式转换成Juice而拥有soup函数
list.foreach(i => println(i.soup))
}
/**
* Use this function to replace [[test0()]]
*/
def test2[T](list: ListBuffer[T])(implicit fun: T => Juice): Unit = list.foreach(i => println(i.juice))
/**
* Use this function to replace [[test1()]]
*/
def test3[T](list: ListBuffer[T])(implicit f1: T => Juice, f2: T => Soup): Unit = {
list.foreach(i => println(i.juice))
list.foreach(i => println(i.soup))
}
// 调用函数
def main(args: Array[String]): Unit = {
val fruits = ListBuffer[Fruit](new Apple, new Orange, new Fruit)
test0(fruits)
test1(fruits)
test2(fruits)
test3(fruits)
}
实际上: 视图界定是scala为了让coding更简洁高效而设计出的一个语法糖,其实现是scala编译器在编译生成字节码过程中的一个自动插入隐式函数到所需位置的操作;如在上边的代码中ListBuffer[Fruit]作为入参传入test0,在foreach时Fruit对象是没有juice函数供给调用的,编译这段代码时不通过,这时编译器会从上下文中找隐式转换,找到了fruitToJuice,并调用该函数将fruit对象转换为Juice,然后在调用Juice中的juice函数(.class反编译结果类似list.foreach(i => println(fruitToJuice(i).juice)));Java中无对应特性与之对应,即每个调用必须明确,编译器并不做类似自动化的补全操作。
T: contextBound 上下文界定(存在ContextBound[T]的隐式值)上下文界定的形式为T : M,其中M是另一个泛型类,它要求必须存在一个类型为M[T]的隐式值。如:[T : Ordering]表示必须存在一个Ordering[Fruit]的隐式值,在使用隐式值的地方声明隐式参数(如:(implicit ordering: Ordering[T]))。
多重上下文界定形式为T : M : N,表示当前代码上下文中必须存在类型M[T]和N[T]两个隐式值,同样,在使用隐式值的地方可以显式的声明隐式参数。
首先定义隐式值
implicit object FruitOrdering extends Ordering[Fruit] {
override def compare(x: Fruit, y: Fruit): Int = {
x.getClass.getName.compareTo(y.getClass.getName)
}
}
/**
* alternative
*
* has the same effect as FruitOrdering, but not singleton, better use `FruitOrdering`
*/
implicit def ordering: Ordering[Fruit] = new Ordering[Fruit] {
override def compare(x: Fruit, y: Fruit): Int = x.getClass.getName.compareTo(y.getClass.getName)
}
import FruitOrdering
import scala.reflect.ClassTag
def test0[T: Ordering](first: T, second: T): Unit = {
// compile error: val arr = new Array[T](2)
println(first + ", " + second)
}
/**
* use `implicit` parameter or else `Fruit extends Ordering[Fruit]`
* 必须存在一个类型为`Ordering[T]`的隐式值
*/
def test1[T: Ordering](first: T, second: T)(implicit ordering: Ordering[T]): Unit = {
// compile error: val arr = new Array[T](2)
val small = if (ordering.compare(first, second) < 0) first else second
println(small)
}
/**
* ClassTag 用于保存运行时`T`的实际类型,`new Array[T](2)`就可以正常编译通过
*
* scala command compile result:
* `test2: [T](first: T, second: T)(implicit evidence$1: Ordering[T], implicit evidence$2: scala.reflect.ClassTag[T])Unit`
*/
def test2[T: Ordering : ClassTag](first: T, second: T)(implicit ordering: Ordering[T]): Unit = {
val arr = new Array[T](2)
val small = if (ordering.compare(first, second) < 0) first else second
println(arr + ", " + small)
}
// 调用函数
def main(args: Array[String]): Unit = {
test0(new Apple, new Orange)
test1(new Apple, new Orange)
test2(new Apple, new Orange)
}
说明:
关于隐式转换,隐式参数相关内容本文并没做太多的讲解,网上相关资料比较多,大家可以尝试自助学习下;
将上述函数test0输入到scala command得到的结果为test0: [T](first: T, second: T)(implicit evidence$1: Ordering[T])Unit,即[:Ordering]被编译成了隐式参数:implicit evidence$1: Ordering[T],这是上下文界定的特定编译方式,大家需要牢记这个编译规则;
上述函数test1中显式的添加了隐式参数implicit ordering: Ordering[T],其原因是在编码阶段函数内部需要对Ordering[T]的实例对象进行调用,不得不添加该隐式参数(编译期动态插入的隐式参数在编码阶段引用不到),test1输入到scala command得到的结果为test1: [T](first: T, second: T)(implicit evidence$1: Ordering[T], implicit ordering: Ordering[T])Unit,在当前代码上下文中main函数调用test1的入参为(Apple, Orange, FruitOrdering ,FruitOrdering) ,即单例对象FruitOrdering会同时出现在第三、四参数位上;
上述函数test2所处的上下文中并没有找到与ClassTag相对应的隐式值,这是scala编译器在编译时对ClassTag做特殊处理,在scala command下的编译的结果在预料之中,出现ClassTag的隐式参数,反编译字节码会发现new Array[T](2)被转换为classTag.newArray(2),其内部通过调用Java中的Array.newInstance(..)动态创建数组对象;main函数中调用test2的代码的入参中隐式参数ClassTag被编译成ClassTag..MODULE$.apply(Fruit.class),这是scala编译器的对ClassTag的特殊处理,大家明白是编译器行为即可。
类型约束是一种比较严格的类型限定方式:
T =:= U 表示T与U的类型相同T <:< U 表示T是U的子类型T <%< U 表示T可被隐式转换为U (2.10 deprecated,2.11 removed)T =:= U是严格的类型约束,要求两个类型完全相等(包括类型参数),如:List =:= List is true,但是List[Apple] =:= List[Orange] is false;
T <:< U是严格的类型约束(与<:相比),要求前者T必须是后者U的子类或类型相同;
T <%< U在scala 2.10已经被标注deprecated,在scala 2.11被移除;同时视图界定<%从scala 2.11开始被标记为deprecated,在未来版本可能会被移除掉,在大家用到视图界定时候最好的选择是在需要隐式转换的地方进行显式的声明(可关注本文视图界定所讲述的内容);
注: 类型约束顾名思义是为对类型进行约束的,仅此而已;
/**
* restrict: T eq Orange
*/
def test0[T](i: T)(implicit ev: T =:= List[Fruit]): Unit = {
i.foreach(println)
}
/**
* same as [[test0()]]
*/
def test1[T](i: T)(implicit ev: =:=[T, Orange]): Unit = {
println(i)
}
def test2[T](list: List[T])(implicit ev: T <:< Fruit): Unit = {
val lis = new Fruit :: list
lis.foreach(println)
}
说明:
=:=, <:<实际上是两个在Predef.scala定义好的两个类(sealed abstract class =:=[From, To] extends (From => To) with Serializable 与 sealed abstract class <:<[-From, +To] extends (From => To) with Serializable),并在scala编译器的协助之下完成类型约束的行为,该行为发生于代码编译期间(感慨scala各种语法糖给编译器带来了大量的编译压力哈哈~~);
上述函数test0, test2中隐式参数的类型是scala的中缀写法,原始写法如test1所示=:=[A,B](带两个类型参数的类),当类有且仅有两个类型参数时候才能用中缀写法,更直观示例:def foo(f: Function1[String, Int])可以替换为中缀形式def foo(f: String Function1 Int);
T <:< U与T <: U的异同点说明,二者都表示T是U的子类,但<:<是更严格的类型约束, 要求在满足T是U子类的条件时不能对T做类型推导&隐式转换的操作;而<:则可以与类型推导&隐式转换配合使用;
下述代码在main函数中用test3(1, List(new Apple))调用def test3[A, B <: A](a: A, b: B)时编译器正常编译通过,调用test3时传入的第一个参数是Int,第二个参数是List[Apple],显然不符合B <: A的约束,为了满足这个约束编译器在做类型推导时会继续向上寻找父类型来匹配是否满足,于是第一个参数被推导为Any类型,此时List[Int]符合Any的子类型,编译通过(用Java Decompiler反编译字节码发现main函数中调用test3的入参类型为Int, List,而这种入参类型不符合test3编译后的入参要求B extends A,即字节码合法性校验较编译器宽松许多,感觉需要读几本编译原理入门下,哈~);
def test3[A, B <: A](a: A, b: B): Unit = {
println(a + ", " + b)
}
// 调用函数
def main(args: Array[String]): Unit = {
test3(1, List(new Apple))
}
下述代码中在调用foo时传入的Apple,Orange不能满足Apple <: Orange,但由于隐式函数a2o的存在,在编译main函数中的第一行 foo(new Apple, new Orange)时会引入隐式参数,将Apple转换为Orange,所以可成功编译并执行;而在编译第二行bar(new Apple, new Orange)会报错error: Cannot prove that Apple <:< Orange;
implicit def a2o(a:Apple) = new Orange
def foo[T, U<:T] (u:U, t:T) = print("OK")
def bar[T, U](u:U, t:T)(implicit f: U <:< T) = println("OK")
def main(args: Array[String]): Unit = {
// 编译成功,引入隐式函数`a2o`
foo(new Apple, new Orange)
// 编译失败,`<:<`是严格类型限定
// error: Cannot prove that Apple <:< Orange
bar(new Apple, new Orange)
}
scala的协变、逆变、不变的特性相比较其他更为的难理解,计划单独新开一篇文详细对比讲解;感兴趣的小伙伴们可以移驾:
https://itinycheng.github.io/2019/01/30/scala-vs-java’s-generics-2/
scala类型参数体系是scala语言的重要特性,相比Java泛型,其更为复杂多变,且与编译器&隐式转换等互相掺杂配合,使得大家很难在短时间内掌握和灵活使用,建议大家多看scala源码,多练习,多尝试,多用scala command, Java Decompiler等工具测试&反编译二进制文件,以求更深刻的了解scala类型参数的语法特性&内在原理。
本文示例的代码存储在工程:https://github.com/itinycheng/jvm-lang-tutorial ,包com.tiny.lang.java.generic 和 com.tiny.lang.scala.generic中。
近来在与两个技术朋友聊天时发现的一个简单有趣的问题,在这做个简短的记录;本文主要围绕问题是什么,在现有技术上下文中如何解决,有哪些优缺点。
问题预告:业务层面多个时区的数据如何映射到一张Druid表,查询时满足不同时区客户查询时都能按照各自时区的实际时间来统计数据;
假设我现在有一套统计分析的Saas系统(简称:TA),其包含两个模块,用户行为收集模块(简称:uTracker)和行为数据统计分析模块(简称:uAnalyzer)类似Google Analytics或是Baidu统计。
TA系统的上下文说明:
TA的客户将uTracker引入到自己的App中,用以收集使用该APP的用户的行为数据;uTracker随着APP的启动而启动并自动收集当前用户行为数据,比如:设备、上下文、访问内容等信息(简称:uData),每条uData都带有该行为数据生成的时间戳(简称:dTs,从设备时钟获取)以及当前设备时钟对应的时区(简称:dZone)。uTracker将收集上来的数据进行合并、格式化、压缩等操作之后通过手机网络将数据发送到uAnalyzer的前置数据收集系统;uAnalyzer系统的清洗、补余,解析等等操作后入到统一的OLAP库中供给客户实时的查询他APP的新增、活跃、留存等指标数据;graph TB
A(用户A) -->|visit| UT(集成uTracker的APP)
B(用户B) -->|visit| UT
C(用户C) -->|visit| UT
D(......) -->|visit| UT
UT -->|send uData| UA(uAnalyzer的前置数据收集模块)
UA -->|pull uData| DP(uAnalyzer的数据处理模块, 以`Flink+Druid`为主)
graph LR
C(uAnalyzer解析数据) -->|sink| D(Druid: new_user)
D -->|query| E(uAnalyzer查询新增用户数服务)
TA的客户群体说明
uTracker,并使用我的Saas版TA系统;uAnalyzer系统中做统计分析;最后:TA系统有10W+的客户在用,假设大多数的客户都有以上两个企业属性;
注: 客户A, B其实可类比为亚马逊、淘宝,他们分属不同国家,其用户群体涵盖的是全球范围;
问题描述:
TA系统中的用户新增指标提出问题;用户新增指标定义:在客户查询的时间区间内该客户APP新增的用户数量;客户A是美国客户,其常用的时区为UTC-5;
客户B是中国客户,其常用时区为UTC+8;
客户A,B的用户新增指标在同一张Druid表(new_user)中;
Druid表new_user采用的时区是UTC+8,粒度是DAY;
客户A的APP所对应的用户主要来自两个区域美国本地UTC-5,中国UTC+8; 客户B的APP所对应的用户同样来自两个区域美国本地UTC-5,中国UTC+8;
问题:对于一条美国当地上报的用户数据,若其uTs在当地时间(UTC-5时区)是2018-11-13 12:00:00则对应的UTC+8时间是2018-11-14 01:00:00;其解析到Druid表时是按照Druid表设置的时区来做aggregate的,当前Durid表时区配置是UTC+8,则最终入库到Druid时会将UTC-5时间2018-11-13 12:00:00的数据解析到 到2018-11-14 00:00:00的segment(Druid表粒度是’DAY’),这就照成了在查询美国时间2018-11-13日的新增用户数实际是2018-11-12 11:00:00到2018-11-13 13:00:00的数据。
其实客户需求很简单,按照客户指定的时间区间和时区查询所有新增用户数量(即:全球范围内的新增用户数);
dataTs:日志产生的时间,日志生成时的设备时间戳;dataZone:接收到的数据中包含的时区,从设备时钟获取;druidZoneConf:Druid表创建时所设置的时区;appZoneConf:查询APP指标时期望用的时区;示例:假设某APP的用户分布在中国和美国,其上报数据的时区dataZone中包含UTC+8, UTC-5两种时区,Druid表配置的时区druidZoneConf是UTC+8,APP设置的查询所用时区appZoneConf是UTC-5;
在处理每条原始日志时对dataTs进行重新校准,校准好的newTimestamp做为Druid入库的时间戳;
伪代码:
/**
* Zone(UTC+8) - Zone(UTC-5) = 8 - (-5) = 13
*/
def calibrate = (Zone a, Zone b) => a - b
val newTimestamp = dts + calibrate(appZoneConf, druidZoneConf)
优点:支持的所有的granularity(时间粒度:秒,分,时,日,周,月,年),代码变动量小,可沿用当前的所有Druid表;
缺点:历史数据没办法处理;APP在创建时必须选择时区且后续无法变更(已入库数据无法适配变更);
在数据解析并写入Druid的过程中不对时间戳&时区做干扰处理,在指标查询过程中针对时区进行查询区间的换算;
示例:Druid表的存储数据所用的时区是UTC+8,美国用户查看指标采用的时区是UTC-5,查询的时间区间是[2018-11-11, 2018-11-13],根据时区差异校准后的数据查询区间为[2018-11-11 13:00:00, 2018-11-13 13:00:00];(druid有针对时区的查询方案,但也要求<=小时的粒度)
时间校准伪代码:
/**
* Zone(UTC+8) - Zone(UTC-5) = 8 - (-5) = 13
*/
def calibrate = (Zone a, Zone b) => a - b
val newStartTime = startTime + calibrate(appZoneConf, druidZoneConf)
val newEndTime = endTime + calibrate(appZoneConf, druidZoneConf)
注:Druid本身的查询也支持时区的转换,可参照如下链接: http://druid.io/docs/latest/querying/granularities.html
优点:不修改原始数据的时间戳,不造成歧义容易理解;筛选数据所用时区可随意变更,无历史数据迁移问题;
缺点:只支持部分granularity(时间粒度:秒,分,时),其他粒度(天,月,周,年)不支持,需要对表进行重新的设计规划;
此外:一些国家使用非整数的时区,如:印度使用的是UTC +5:30,这时在使用方案二前提下无法去查询roll-up by HOUR的表;
若在系统建设初期没考虑到跨时区等问题,到后期再更正就有存在比较大的工作量投入,且牵涉到历史数据处理迁移的问题;方案一是较为简单治标方案,比较适用于已有系统的变更;方案二则是一个比较能治标治本的方案,但需要在项目初期规划时候就考量进去,同时遇到数据量特别大的场景也需要将性能纳入考量,毕竟对同一个Druid Schema来讲粒度从DAY变为HOUR数据最大增长24倍;