暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

百亿流水-Flink去重实践

lex技术 2021-12-29
2991

1、背景

    业务方需求如下背景,需要对几个亿级基数的维度进行天去重。举例如下,用户维度、歌曲(或者文章、电影本文以歌曲为例)、行为(播放、收藏、分享等行为)。用户维度基数假定亿级、歌曲维度基数假定亿级、行为基数20。需求是对某个用户在某首歌播放次数超过10次以后的进行去重。

2、去重方案

    一般flink去重的话有几种方式可选择。

  • 布隆过滤器,不过由于布隆过滤器有误差,所以我们这里并没有采用。

  • 引入外部存储比如redis、hbase,即将各个维度数据组合作为key,一天次数作为value。经过计算一天大概是500G量。由于redis成本较大,并且涉及网络io,以及引入外部组件的不稳定性,该没有采取该方案。

  • Flink RocksDb状态后端去重。该方案是Flink自身保证,稳定性高,并且RocksDb是存储在本地磁盘。存储量大、并且不涉及网络IO。而且Flink sql 原生支持row number方式去重,Sql方式维护方便。因此最终我们选择了此方案。

3、Flink Sql 去重

    如下Flink sql提供了标准去重语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]


参数说明
ROW_NUMBER()计算行号的OVER窗口函数。行号从1开始计算。
PARTITION BY col1[, col2..]可选。指定分区的列,即去重的KEYS。
ORDER BY timeAttributeCol asc desc指定排序的列,必须是一个的字段(即Proctime或Rowtime)。可以指定顺序(Keep FirstRow)或者倒序 (Keep LastRow)。
rownum外层查询中对排名进行过滤,只取前N条

3.1 Deduplication方式

    当rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。

  •     Deduplicate Keep FirstRow保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。

SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1

  • Deduplicate Keep LastRow保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。

SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
FROM T
)
WHERE rowNum = 1

    Deduplicate Keep FirstRow方式去重原理是在flink中维护一个boolean类型的状态,来判断是否已经存在该记录。例如:如果对应的key不存在,则将该值设置为true,后续的记录通过key获取该值来判断是否是重复。Deduplicate Keep LastRow也是类似的实现,不过需要保留最后一个的记录,且会产生变更的记录。

/**
* Processes element to deduplicate on keys with process time semantic, sends current element if
* it is first row.
*
* @param currentRow latest row received by deduplicate function
* @param state state of function
* @param out underlying collector
*/
static void processFirstRowOnProcTime(
RowData currentRow, ValueState<Boolean> state, Collector<RowData> out)
throws Exception {


checkInsertOnly(currentRow);
// ignore record if it is not first row
if (state.value() != null) {
return;
}
state.update(true);
// emit the first row which is INSERT message
out.collect(currentRow);
}


3.2 top N 方式

    3.2.1 去重原理

   经典的TopN算法:从已经存在的数组中,找出最大(或最小)的前n个元素。算法(以找最大的n个元素为例):


  • 取出数组的前n个元素,创建长度为n的小根堆;

  • 从n开始循环数组的剩余元素,如果当前元素比小根堆的根节点大,则将当前元素设置成小根堆的根节点,并通过调整让堆保持小根堆;

  • 循环完成后,小根堆中的所有元素就是需要找的最大的n个元素;

  • 根据需要对小根堆中的所有元素继续利用堆排序算法进行排序。

最终,堆中的结果就是top N。

flink 也是采用一个类似处理,每一个需要去重的key,维护了一个堆,然后对后续对应key过来的数据更新这个堆。这些堆信息存储在flink状态中。

    3.2.2 去重策略

  • AppendFast:结果只追加,不更新;

  • Retract:类似于回撤流,结果会更新,前提是输入数据没有主键,或者主键与partitionKey不同;

  • UpdateFast:快速更新,前提是输入数据有主键,且结果单调递增/递减,还要求orderKey的排序规则与结果的单调性相反(例:ORDER BY sum(quantity) DESC )。可见它的效率最高,但是也最苛刻。

    3.2.3 状态保存

    前面可以看到,去重是基于状态实现的。因此,控制好状态的大小,即可以实现按照1 hour 1day等进行去重。如下所示:

    

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
tEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));


4、Flink sql改造

    由于我们的场景比较特殊,不需要求出top N数据,只需要按照时间过滤掉当天出现N次以后的数据即可。因此,这种场景可以进行优化,在flink中维护一个integer的状态。对于同一个key值,用integer来统计次数,如果次数大于指定的值,则丢弃该数据,否则保留该数据并且状态值+1。这种方式只用了一个integer类型的值,来替代一个具有N个元素的堆。时间和空间复杂度大大减低。(该方案由jerry哥提出和改造)

部分代码如下:


@Override
public void processElement(RowData input, Context context, Collector<RowData> out)
throws Exception {
initRankEnd(input);


// check message should be insert only.
Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
int currentRank = state.value() == null ? 0 : state.value();
// ignore record if it does not belong to the first-n rows
if (currentRank >= rankEnd) {
return;
}
currentRank += 1;
state.update(currentRank);


if (outputRankNumber) {
collectInsert(out, input, currentRank);
} else {
collectInsert(out, input);
}
}


5、Flink sql问题

    初始去重sql如下:

select SELECT_COLUMNS from 
(
select SELECT_COLUMNS
from
(
        SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, cmd, FLOOR(proc_time TO dayorder by proc_time asc ) as row_num 
from tableA
        where  cmd = 1 and user_id > 0 and playtime >=(duration*0.8) and duration>0
)
where row_num <=10
)
union all
(
select SELECT_COLUMNS from
(
SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, cmd, FLOOR(proc_time TO day) order by proc_time asc) as row_num
from tableA
        where cmd = 2 and user_id > 0 and playtime >=(duration*0.8) and duration>0
)
where row_num <=3
)
union all
(
select SELECT_COLUMNS from

SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, cmd, FLOOR(proc_time TO day) order by proc_time asc) as row_num
from tableA
        where cmd =3  and user_id > 0  and playtime >=(duration*0.8) and duration>0
)
where row_num <= 1
)

    如上述sql所示。cmd是行为(假定为1是播放歌曲、2是收藏歌曲、3是分享歌曲),PARTITION BY后的是不同的维度歌曲、用户、行为、日期(天去重)。对同一个table的不同的行为进行不同的次数的去重,然后将去重后的结果union起来,就是我们的数据结果。

5.1 sql union问题

    最开始时union all写成了union,发现性能极低,flink operator中出现了GroupAggregate算子,且是对所有的字段进行group by。分析原因是由于使用union关键字。union关键字不允许重复,会对结果再次进行去重,union相当于 union all + distinct。因此,后续改成了union all将各个cmd行为下的去重结果合并。如下图所示,状态明显降低。

union状态:

修改为union all的状态:

    另外写sql的时候,有小伙伴过滤条件是写在外层,导致状态中存储了大量无效的key。比如如下过滤条件: user_id > 0  and playtime >=(duration*0.8) and duration>0可以过滤大量的脏数据,但是如果放在外层,将会导致里层的ROW_NUMBER() OVER PARTITION BY 时,会将大量的脏数据也加入到状态中了。

    select SELECT_COLUMNS  from 
SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, cmd, FLOOR(proc_time TO day) order by proc_time asc) as row_num
from tableA
        where cmd =3  
)
    where row_num <= 1 and user_id > 0 and playtime >=(duration*0.8and duration>0


6、flink性能优化

6.1 flink背压问题解决

    对应的sql已经写好,提交到集群上运行。作业运行2个小时后,发现有严重背压。

    通过arthas命令代码总是在执行backend.db.get(),也就是在执行从状态中获取状态值。如前所述,flink会把PARTITION BY后的各个维度的组合为key,value为对应的key出现的次数。每来一条记录,都会去状态中获取key对应的value(次数),来判断该条记录是否被丢弃。



AppendOnlyFirstNFunction.processElement()
-->RocksDBValueState.value()
-->backend.db.get()


AppendOnlyFirstNFunction类:

    @Override
public void processElement(RowData input, Context context, Collector<RowData> out)
throws Exception {
initRankEnd(input);


// check message should be insert only.
Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);


Integer stateValue = state.value();
int currentRank = stateValue == null ? 0 : stateValue;
// ignore record if it does not belong to the first-n rows
if (currentRank >= rankEnd) {
return;
}
currentRank += 1;
state.update(currentRank);


if (outputRankNumber) {
collectInsert(out, input, currentRank);
} else {
collectInsert(out, input);
}
}

RocksDBValueState

@Override
public V value() {
try {
byte[] valueBytes = backend.db.get(columnFamily,
serializeCurrentKeyWithGroupAndNamespace());




if (valueBytes == null) {
return getDefaultValue();
}
dataInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
}
}

    该db.get该函数是由rocksdb提供,是否可以增加缓存来提升速度。如下图所示,能否增大rocksdb缓存来提升rocksdb读性能。查询rocksdb缓存设置,发现flink rocksdb的读缓存(state.backend.rocksdb.block.cache-sizeblock)默认为8MB,设置的太小,导致大量的读打到磁盘。随机增加flink taskManger内存,并将state.backend.rocksdb.block.cache-sizeblock内存设置为512MB。


    调整flink内存和rocksdb读缓存以后,flink作业背压解决。



6.2 flink beyond the 'PHYSICAL' memory limit. 

    如上调优以后,运行2天左右,作业内存超过了yarn容器分配的物理内存的限制,被yarn kill掉了。

2021-02-25 06:49:25,593 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hadoop02.tcd.com/9.44.33.8:60899
2021-02-25 06:49:25,593 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@hadoop02.tcd.com:60899] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@hadoop02.tcd.com:60899]] Caused by: [java.net.ConnectException: Connection refused: hadoop02.tcd.com/9.44.33.8:60899]
2021-02-25 06:49:32,762 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: [2021-02-25 06:49:31.879]Container [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e26_1614150721877_0021_01_000004 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 24551 24324 24324 24324 (java) 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address=hadoop02.tcd.com -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b -Dexecution.target=embedded -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 -Djobmanager.rpc.port=54474 -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar -Drest.address=hadoop02.tcd.com -Djobmanager.memory.jvm-metaspace.size=268435456b -Djobmanager.memory.heap.size=1073741824b -Djobmanager.memory.jvm-overhead.max=201326592b
|- 24324 24315 24324 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address='hadoop02.tcd.com' -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' -Dexecution.target='embedded' -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' -Djobmanager.rpc.port='54474' -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar' -Drest.address='hadoop02.tcd.com' -Djobmanager.memory.jvm-metaspace.size='268435456b' -Djobmanager.memory.heap.size='1073741824b' -Djobmanager.memory.jvm-overhead.max='201326592b' 1> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out 2> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err


[2021-02-25 06:49:31.896]Container killed on request. Exit code is 143
[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143.

    同时发现虚拟内存占用挺大,于是顺便解决了一下glibc Thread Arena 问题,flink-conf.yaml中添加该参数containerized.taskmanager.env.MALLOC_ARENA_MAX: 1。不过没啥作用,真正原因是物理内存超用了。



    只好通过tcmalloc来分析一下task的堆外内存,经过安装tcmalloc工具。将对应的内存都dump下来观察。没发现啥异常。。。推测是不是由于rocksdb的内存不可控,导致超用了物理内存。因此,降rocksdb缓存有512MB降低到128MB,同时将flink manager内存增加。


    在此时,偶然发现flink状态并没有按天过期被清理。导致state不停地在增加。

6.3 flink状态定期清理

    flink中有两种状态清理机制:Flink State TTL 机制、Idle State Retention Time 。

    目前该去重算子采用Flink State TTL 机制。该机制默认方案是:需要再次访问,才能清理过期的状态。而我们这里按照flink状态中key包含的是LOOR(proc_time TO day)时间。举个例子,今天是12月28日,那么新的flink状态中的key就会包含12月28日,而到了12月29日的记录key都是12月29日,而12月28日的key永远也不会被访问了。不能被访问那么就不会被Flink State TTL 机制清理掉。如此,flink中的状态将无限制增加。

/**
* Utility to create a {@link StateTtlConfig} object.
* */
public class StateTtlConfigUtil {


/**
* Creates a {@link StateTtlConfig} depends on retentionTime parameter.
* @param retentionTime State ttl time which unit is MILLISECONDS.
*/
public static StateTtlConfig createTtlConfig(long retentionTime) {
if (retentionTime > 0) {
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
} else {
return StateTtlConfig.DISABLED;
}
}
}

有两种办法解决:

1、Flink State TTL 机制增加了cleanupFullSnapshot和cleanupInRocksdbCompactFilter方法进行清理老的状态,即使没有被访问。    

    cleanupFullSnapshot:完整快照时的过期状态剔除老的状态。

    cleanupInRocksdbCompactFilter:可指定queryTimeAfterNumEntries参数。当写入多少条状态数据后,更新当前的时间戳。当 RocksDB 在后台执行 Compaction 操作时,根据当前的时间戳来判断该状态是否过期,然后过滤掉那些失效的 Key 及 Value. 如果这个值queryTimeAfterNumEntries设置的小,将会提升清理状态的速度。由于Flink 通过JNI 的方式来调用rocksdb代码的,因此频繁调用会有较大开销。


    /**
* Cleanup expired state while Rocksdb compaction is running.
*
* <p>RocksDB compaction filter will query current timestamp,
* used to check expiration, from Flink every time after processing {@code queryTimeAfterNumEntries} number of state entries.
* Updating the timestamp more often can improve cleanup speed
* but it decreases compaction performance because it uses JNI call from native code.
*
* @param queryTimeAfterNumEntries number of state entries to process by compaction filter before updating current timestamp
*/
@Nonnull
public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) {
strategies.put(
CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
new RocksdbCompactFilterCleanupStrategy(queryTimeAfterNumEntries));
return this;
}

    queryTimeAfterNumEntries压测发现当设置为1000时,当增加流量时,性能下降比较明显,产生明显的背压。消费lag持续增加。最终经过线上数据压测,将该值设置为100000L,达到较好的效果。


  public static StateTtlConfig createTtlConfig(long retentionTime) {
if (retentionTime > 0) {
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.cleanupInRocksdbCompactFilter(100000L)
.build();
} else {
return StateTtlConfig.DISABLED;
}
}

    加上这两个配置后,发现老的状态已经被清理了,状态始终维持在一定大小。

    


2、Idle State Retention Time 机制

    修改源代码,将清理机制修改为Idle State Retention Time。该机制是通过timer来清理掉老的状态,因此即使该状态没有被访问也可以被清理。但是Idle State Retention Time会将timer也加入到flink state中,导致总的状态膨胀很多。


实验对比:

Flink State TTL 机制:checkpoint周期为1分钟一次,运行20分钟checkpoint大



Idle State Retention Time 机制:checkpoint周期为1分钟一次,运行20分钟checkpoint大小

换成idle state 以后状态膨胀了3.26倍 6.2/1.9= 3.26倍


对比发现Flink State TTL 机制状态更小,性能更加,最终采用了Flink State TTL 机制。

6.4 Rocksdb Merge调优

    经过上述优化后,作业比较稳定。但是偶发性出现cpu使用率很高的情况。通过perf命令定位发现,cpu爆高时,rocksdb频繁的在做Merge操作。随机调优rocksdb merge。

    state.backend.rocksdb.writebuffer.number-to-merge:默认值为 1,决定了 Write Buffer 合并的最小阈值。

    state.backend.rocksdb.thread.num:默认值为1,这个参数允许用户增加最大的后台 Compaction 和 Flush 操作的线程数。提高该值可以明显提升后台merge性能。

    state.backend.rocksdb.writebuffer.count:默认值为2,内存中允许保留的 MemTable 最大个数,超过count后,就会被 Flush 刷写到磁盘上成为 SST 文件。

    经过线上实际压测,将值调整如下,调整以后cpu不在告警。

 state.backend.rocksdb.writebuffer.number-to-merge: 2
state.backend.rocksdb.thread.num: 8
state.backend.rocksdb.writebuffer.count: 3


7、线上压测

    18个TaskManager,TM内存10G,60亿key,checkpoint 大小250G的条件下,去重作业消费速度约:18w/s,作业是无背压、稳定。

  



参考文章:

https://developer.aliyun.com/article/772873

https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&idx=1&sn=b0893a9bf12fbcae76852a156302de95

https://cloud.tencent.com/developer/news/755802

https://cloud.tencent.com/developer/article/1452844

文章转载自lex技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论