反压机制:

1:数据接受速率 大于 数据输出速率,压力往上游一步步传导,直到source

2:压力传递的细节:

    1 输出缓冲区:resultpartition -> local bufferpool -> network bufferpool

    2 输入缓冲区:inputgate -> local bufferpool -> network bufferpool

3:credit,相比tcp滑动窗口的优点;

1 credit 反压生效的延迟比较低;

2 单个task反压,不会引起socket阻塞;

现象

1 webui的颜色(13版本)

2 Ck时间变长 =》最终ck超时失败、重试 =》job挂掉

3 状态变大

4 kafka数据积压

5 oom

原因

1 资源不合理,并行度给的不够,内存,cpu少了

2 数据倾斜

3 代码性能低

4 与外部系统交互,比如读写hbase等外部系统

定位

1 通过webui定义:如果要精确定位到哪一个算子,可以先禁用operator chain

    1:上游全是high,第一个为ok的节点就是瓶颈节点

    2:短暂的情况:上游全是ok,下游都是high,第一个为high的节点最终会和1一样

2 通过metrics定位:

    buffer.inpoolusage buffers.outpoolusage

    低 低 =》正常或瓶颈节点

    低 高 =》被反压的节点

    高 低 =》瓶颈节点

    高 高 =》被反压的节点

分析处理

1 排查 数据是否倾斜

2 分析瓶颈节点的代码

3 借助工具分析代码性能:(线程 调用栈,执行时长)

    1:指定参数,开启火焰图

    2:在webui,打开瓶颈节点的火焰图,查看分析火焰图:纵向:调用链,最上面的是执行中的;横向:可以理解为执行时长;总结,找”大平顶“ =》对应的代码(如果不好找,往下接着找)

4 分析GC

    1 参数指定,打印gc日志

    2 从webui下载

    3 用gc工具(gcviewer)打开gc日志,查看fullgc情况,分析内存泄漏,fullgc后的内存使用量,fullgc的次数,fullgc的暂停时长

5 与外部系统交互

    1 对hbase的优化:异步io,提数并发效率;旁路缓存:提升读取效率

    2 对mysql,ch等数数据库:一般来讲:攒批(几千~几万一批)

flink与kafka结合使用的一些问题:

1:flink消费kafka

    source算子并行度 > kafka分区数 不会丢数据,但是个别sourced的子任务没有消费数据,导致watermark不更新,导致事件事件语义的窗口无法触发,可以设置watermark的空闲等待功能

    source算子并行度 < kafka分区数 不会丢数据,但是source的子任务之间数据倾斜,可以在source后调用重分区算子,rescale等分区策略

    source算子并行度 = kafka分区数 最好配置

2:flink写入kafka

    sink算子并行度 > kafka分区数 不会丢数据

    sink算子并行度 < kafka分区数 不会丢数据,但会导致kafka的部分分区没有数据写入,进一步导致后续消费改topic的flink程序,出现watermark不更新的问题

    sink算子并行度 = kafka分区数 最好配置

3 kafka增加分区,会有什么问题;

    1 生产者能识别新分区,写入数据,但消费者不会主动发现新分区,导致丢数;flink开启kafka动态分区发现功能,设置间隔为10s查询kafka的元数据

    2 flink消费kafka的offset是怎么维护的?自动提交? offset由flink手动维护,不会自动提交,source算子将offset保存在算子状态list里,不是普通的list,而是联合list: unionlist保存

    3 flink任务的tm和jm设置:

        1 tm设置4G,jm设置2G

        通用设置1cu = 1个cpu + 4G内存;如果当并行度为5 每个TM = 2个slot 3个tm,1个jm,那么启动yarn会启动4个容器,每个容易多少cpu,多少内存?这个与yarn的配置策略有关系,如果使用默认测试,一个容器只会申请一个cpu,如果使用Dominant测试,一个slot会使用一个cpu,

总结:

    1:并行度:3个(kafka分区)

    2:每个tm的slot数:1拖N,3(节省资源)

    3:内存:原则上 1cu = 1cpu+ 4G内存,再观察调整TM:4G,JM:2G

    4:cpu设置:如果想要一个slot申请一个cpu,就是用Dominant策略;

checkpoint设置:

    1:间隔:兼顾性能和延迟,设置30S

    2:语义:精准一次

    3:最小等待间隔:间隔的0.5-1倍之间都可以

    4:超时

    5:失败次数

    6:ck保留外部系统

大状态调优

    1 状态上G级别:

        1 使用内存存储的缺陷:1:使用hashmap只能全量,效率慢; 2:浪费内存资源,tm内存还得再加

    2 使用rocksdb:

        1:每次都是增量,而且是一个k-v数据库,存储的是序列化后的数据

        2:开启增量检查点功能

        3:开启本地恢复功能

        4:设置预定义选项为:disk and high mem

        5:开启性能监控

链路延迟检测

    1:实时数仓的全链路延迟是多少?

        1:watermark:经验值,要求延迟低- 秒级(5S),对要求不高的可以-分钟

        2:ck间隔:30s

        3:flink程序本身的处理延迟:5S窗口,计算性能(0.03ms)

        4:网络延迟:偶尔1s

        整体算下来是:全链路延迟 --- 分钟级别