Flink反压的一些总结项
反压机制:
1:数据接受速率 大于 数据输出速率,压力往上游一步步传导,直到source
2:压力传递的细节:
1 输出缓冲区:resultpartition -> local bufferpool -> network bufferpool
2 输入缓冲区:inputgate -> local bufferpool -> network bufferpool
3:credit,相比tcp滑动窗口的优点;
1 credit 反压生效的延迟比较低;
2 单个task反压,不会引起socket阻塞;
现象
1 webui的颜色(13版本)
2 Ck时间变长 =》最终ck超时失败、重试 =》job挂掉
3 状态变大
4 kafka数据积压
5 oom
原因
1 资源不合理,并行度给的不够,内存,cpu少了
2 数据倾斜
3 代码性能低
4 与外部系统交互,比如读写hbase等外部系统
定位
1 通过webui定义:如果要精确定位到哪一个算子,可以先禁用operator chain
1:上游全是high,第一个为ok的节点就是瓶颈节点
2:短暂的情况:上游全是ok,下游都是high,第一个为high的节点最终会和1一样
2 通过metrics定位:
buffer.inpoolusage buffers.outpoolusage
低 低 =》正常或瓶颈节点
低 高 =》被反压的节点
高 低 =》瓶颈节点
高 高 =》被反压的节点
分析处理
1 排查 数据是否倾斜
2 分析瓶颈节点的代码
3 借助工具分析代码性能:(线程 调用栈,执行时长)
1:指定参数,开启火焰图
2:在webui,打开瓶颈节点的火焰图,查看分析火焰图:纵向:调用链,最上面的是执行中的;横向:可以理解为执行时长;总结,找”大平顶“ =》对应的代码(如果不好找,往下接着找)
4 分析GC
1 参数指定,打印gc日志
2 从webui下载
3 用gc工具(gcviewer)打开gc日志,查看fullgc情况,分析内存泄漏,fullgc后的内存使用量,fullgc的次数,fullgc的暂停时长
5 与外部系统交互
1 对hbase的优化:异步io,提数并发效率;旁路缓存:提升读取效率
2 对mysql,ch等数数据库:一般来讲:攒批(几千~几万一批)
flink与kafka结合使用的一些问题:
1:flink消费kafka
source算子并行度 > kafka分区数 不会丢数据,但是个别sourced的子任务没有消费数据,导致watermark不更新,导致事件事件语义的窗口无法触发,可以设置watermark的空闲等待功能
source算子并行度 < kafka分区数 不会丢数据,但是source的子任务之间数据倾斜,可以在source后调用重分区算子,rescale等分区策略
source算子并行度 = kafka分区数 最好配置
2:flink写入kafka
sink算子并行度 > kafka分区数 不会丢数据
sink算子并行度 < kafka分区数 不会丢数据,但会导致kafka的部分分区没有数据写入,进一步导致后续消费改topic的flink程序,出现watermark不更新的问题
sink算子并行度 = kafka分区数 最好配置
3 kafka增加分区,会有什么问题;
1 生产者能识别新分区,写入数据,但消费者不会主动发现新分区,导致丢数;flink开启kafka动态分区发现功能,设置间隔为10s查询kafka的元数据
2 flink消费kafka的offset是怎么维护的?自动提交? offset由flink手动维护,不会自动提交,source算子将offset保存在算子状态list里,不是普通的list,而是联合list: unionlist保存
3 flink任务的tm和jm设置:
1 tm设置4G,jm设置2G
通用设置1cu = 1个cpu + 4G内存;如果当并行度为5 每个TM = 2个slot 3个tm,1个jm,那么启动yarn会启动4个容器,每个容易多少cpu,多少内存?这个与yarn的配置策略有关系,如果使用默认测试,一个容器只会申请一个cpu,如果使用Dominant测试,一个slot会使用一个cpu,
总结:
1:并行度:3个(kafka分区)
2:每个tm的slot数:1拖N,3(节省资源)
3:内存:原则上 1cu = 1cpu+ 4G内存,再观察调整TM:4G,JM:2G
4:cpu设置:如果想要一个slot申请一个cpu,就是用Dominant策略;
checkpoint设置:
1:间隔:兼顾性能和延迟,设置30S
2:语义:精准一次
3:最小等待间隔:间隔的0.5-1倍之间都可以
4:超时
5:失败次数
6:ck保留外部系统
大状态调优
1 状态上G级别:
1 使用内存存储的缺陷:1:使用hashmap只能全量,效率慢; 2:浪费内存资源,tm内存还得再加
2 使用rocksdb:
1:每次都是增量,而且是一个k-v数据库,存储的是序列化后的数据
2:开启增量检查点功能
3:开启本地恢复功能
4:设置预定义选项为:disk and high mem
5:开启性能监控
链路延迟检测
1:实时数仓的全链路延迟是多少?
1:watermark:经验值,要求延迟低- 秒级(5S),对要求不高的可以-分钟
2:ck间隔:30s
3:flink程序本身的处理延迟:5S窗口,计算性能(0.03ms)
4:网络延迟:偶尔1s
整体算下来是:全链路延迟 --- 分钟级别