在Flink实时统计中,提到去重,我能想到比较流行的几种方式:
布隆过滤器 - 非精确去重,精度可以配置,但精度越高,需要的开销就越大。主流的框架可以使用guava的实现,或者借助于redis的bit来自己实现,hash算法可以照搬guava的。
HyperLoglog - 基于基数的非精确去重,优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
BitMap - 优点是精确去重,占用空间小(在数据相对均匀的情况下)。缺点是只能用于数字类型(int或者long)。
本文主要讲述Flink基于RoaringBitmap的去重方案,首先引入依赖:
<dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.8.13</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.6</version></dependency>
构建BitIndex
BitMap固然好用,但是对去重的字段只能用int或者long类型;但是如果去重字段不是int或者long怎么办呢?那我们就构建一个字段与BitIndex的映射关系表,bitIndex从1开始递增。比如{a = 1, b = 2, c = 3};使用的时候先从映射表里根据字段取出对应的bitindex,如果没有,则全局生成一个。这里我用redis来作为映射表。具体实现我放在一个MapFunction里,如下:
public class BitIndexBuilderMap extends RichMapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>> {private static final Logger LOG = LoggerFactory.getLogger(BitIndexBuilderMap.class);private static final String GLOBAL_COUNTER_KEY = "FLINK:GLOBAL:BITINDEX";private static final String GLOBAL_COUNTER_LOCKER_KEY = "FLINK:GLOBAL:BITINDEX:LOCK";private static final String USER_BITINDEX_SHARDING_KEY = "FLINK:BITINDEX:SHARDING:";/*** 把用户id分散到redis的100个map中,防止单个map的无限扩大,也能够充分利用redis cluster的分片功能*/private static final Integer REDIS_CLUSTER_SHARDING_MODE = 100;private HashFunction hash = Hashing.crc32();private RedissonClient redissonClient;@Overridepublic void open(Configuration parameters) throws Exception {// ParameterTool globalPara = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();Config config = new Config();config.setCodec(new StringCodec());config.useClusterServers().addNodeAddress(getRedissonNodes("redis1:8080,redis2:8080,redis3:8080")).setPassword("xxxx").setSlaveConnectionMinimumIdleSize(1).setMasterConnectionPoolSize(2).setMasterConnectionMinimumIdleSize(1).setSlaveConnectionPoolSize(2).setSlaveConnectionMinimumIdleSize(1).setConnectTimeout(10000).setTimeout(10000).setIdleConnectionTimeout(10000);redissonClient = Redisson.create(config);}/*** 把userId递增化,在redis中建立一个id映射关系* @param in* @return* @throws Exception*/@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> in) throws Exception {String userId = in.f0;//分片int shardingNum = Math.abs(hash.hashBytes(userId.getBytes()).asInt()) % REDIS_CLUSTER_SHARDING_MODE;String mapKey = USER_BITINDEX_SHARDING_KEY + shardingNum;RMap<String, String> rMap = redissonClient.getMap(mapKey);// 如果为空,生成一个bitIndexString bitIndexStr = rMap.get(userId);if(StringUtils.isEmpty(bitIndexStr)) {LOG.info("userId[{}]的bitIndex为空, 开始生成bitIndex", userId);RLock lock = redissonClient.getLock(GLOBAL_COUNTER_LOCKER_KEY);try{lock.tryLock(60, TimeUnit.SECONDS);// 再get一次bitIndexStr = rMap.get(userId);if(StringUtils.isEmpty(bitIndexStr)) {RAtomicLong atomic = redissonClient.getAtomicLong(GLOBAL_COUNTER_KEY);bitIndexStr = String.valueOf(atomic.incrementAndGet());}rMap.put(userId, bitIndexStr);}finally{lock.unlock();}LOG.info("userId[{}]的bitIndex生成结束, bitIndex: {}", userId, bitIndexStr);}return new Tuple3<>(in.f0, in.f1, Integer.valueOf(bitIndexStr));}@Overridepublic void close() throws Exception {if(redissonClient != null) {redissonClient.shutdown();}}private String[] getRedissonNodes(String hosts) {List<String> nodes = new ArrayList<>();if (hosts == null || hosts.isEmpty()) {return null;}String nodexPrefix = "redis://";String[] arr = StringUtils.split(hosts, ",");for (String host : arr) {nodes.add(nodexPrefix + host);}return nodes.toArray(new String[nodes.size()]);}}
去重逻辑
通过MapFunction拿到字段对应的BitIndex之后,就可以直接进行去重逻辑了,比如我要统计某个页面下的访问人数:
public class CountDistinctFunction extends KeyedProcessFunction<Tuple, Tuple3<String, String, Integer>, Tuple2<String, Long>> {private static final Logger LOG = LoggerFactory.getLogger(CountDistinctFunction.class);private ValueState<Tuple2<RoaringBitmap, Long>> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Types.TUPLE(Types.GENERIC(RoaringBitmap.class), Types.LONG)));}@Overridepublic void processElement(Tuple3<String, String, Integer> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {// retrieve the current countTuple2<RoaringBitmap, Long> current = state.value();if (current == null) {current = new Tuple2<>();current.f0 = new RoaringBitmap();}current.f0.add(in.f2);long processingTime = ctx.timerService().currentProcessingTime();if(current.f1 == null || current.f1 + 10000 <= processingTime) {current.f1 = processingTime;// write the state backstate.update(current);ctx.timerService().registerProcessingTimeTimer(current.f1 + 10000);} else {state.update(current);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();Tuple2<RoaringBitmap, Long> result = state.value();result.f0.runOptimize();out.collect(new Tuple2<>(key.f0, result.f0.getLongCardinality()));}}
主程序的主要代码:
env.addSource(source).map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {String[] arr = StringUtils.split(value, ",");return new Tuple2<>(arr[0], arr[1]);}}).keyBy(0) //根据userId分组.map(new BitIndexBuilderMap()) //构建bitindex.keyBy(1) //统计页面下的访问人数.process(new CountDistinctFunction()).print();
总结
如果你的数据字段已经是数字类型时,可以不用构建BitIndex,但是要确保你的字段是有规律,而且递增,如果是long类型还可以用Roaring64NavigableMap,但如果是雪化算法生成的id,最好不要用,因为不能压缩,占用空间非常大,笔者之前就是直接用Roaring64NavigableMap,1000多万个id就达到了700多M。
以上实现在数据量特别大的时候,在生成bitindex的时候会有性能的瓶颈,所以我们应该预先构建BitIndex,也就是把你的数据库当中的所有用户id,预先用flink批处理任务,生成映射。基本代码如下:
// main方法final ExecutionEnvironment env = buildExecutionEnv();//如果没有找到好的方法保证id单调递增,就设置一个并行度env.setParallelism(1);TextInputFormat input = new TextInputFormat(new Path(MEMBER_RIGHTS_HISTORY_PATH));input.setCharsetName("UTF-8");DataSet<String> source = env.createInput(input).filter(e -> !e.startsWith("user_id")).map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {String[] arr = StringUtils.split(value, ",");return arr[0];}}).distinct();source.map(new RedisMapBuilderFunction()).groupBy(0).reduce(new RedisMapBuilderReduce()).output(new RedissonOutputFormat());long counter = source.count();env.fromElements(counter).map(new MapFunction<Long, Tuple3<String, String, Object>>() {@Overridepublic Tuple3<String, String, Object> map(Long value) throws Exception {return new Tuple3<>("FLINK:GLOBAL:BITINDEX", "ATOMICLONG", value);}}).output(new RedissonOutputFormat());// 注意分区逻辑和key要和stream的保持一致public class RedisMapBuilderFunction implements MapFunction<String, Tuple3<String, String, Object>> {private static final String USER_BITINDEX_SHARDING_KEY = "FLINK:BITINDEX:SHARDING:";private static final Integer REDIS_CLUSTER_SHARDING_MODE = 100;private HashFunction hash = Hashing.crc32();private Integer counter = 0;@Overridepublic Tuple3<String, String, Object> map(String userId) throws Exception {counter ++;int shardingNum = Math.abs(hash.hashBytes(userId.getBytes()).asInt()) % REDIS_CLUSTER_SHARDING_MODE;String key = USER_BITINDEX_SHARDING_KEY + shardingNum;Map<String, String> map = new HashMap<>();map.put(userId, String.valueOf(counter));return new Tuple3<>(key, "MAP", map);}}public class RedisMapBuilderReduce implements ReduceFunction<Tuple3<String, String, Object>> {@Overridepublic Tuple3<String, String, Object> reduce(Tuple3<String, String, Object> value1, Tuple3<String, String, Object> value2) throws Exception {Map<String, String> map1 = (Map<String, String>) value1.f2;Map<String, String> map2 = (Map<String, String>) value2.f2;map1.putAll(map2);return new Tuple3<>(value1.f0, value1.f1, map1);}}//输出 到redispublic class RedissonOutputFormat extends RichOutputFormat<Tuple3<String, String, Object>> {private RedissonClient redissonClient;@Overridepublic void configure(Configuration parameters) {}@Overridepublic void open(int taskNumber, int numTasks) throws IOException {Config config = new Config();config.setCodec(new StringCodec());config.useClusterServers().addNodeAddress(getRedissonNodes("redis1:8080,redis2:8080,redis3:8080")).setPassword("xxx").setSlaveConnectionMinimumIdleSize(1).setMasterConnectionPoolSize(2).setMasterConnectionMinimumIdleSize(1).setSlaveConnectionPoolSize(2).setSlaveConnectionMinimumIdleSize(1).setConnectTimeout(10000).setTimeout(10000).setIdleConnectionTimeout(10000);redissonClient = Redisson.create(config);}/*** k,type,value* @param record* @throws IOException*/@Overridepublic void writeRecord(Tuple3<String, String, Object> record) throws IOException {String key = record.f0;RKeys rKeys = redissonClient.getKeys();rKeys.delete(key);String keyType = record.f1;if("STRING".equalsIgnoreCase(keyType)) {String value = (String) record.f2;RBucket<String> rBucket = redissonClient.getBucket(key);rBucket.set(value);} else if("MAP".equalsIgnoreCase(keyType)) {Map<String, String> map = (Map<String, String>) record.f2;RMap<String, String> rMap = redissonClient.getMap(key);rMap.putAll(map);} else if("ATOMICLONG".equalsIgnoreCase(keyType)) {long l = (long) record.f2;RAtomicLong atomic = redissonClient.getAtomicLong(key);atomic.set(l);}}@Overridepublic void close() throws IOException {if(redissonClient != null) {redissonClient.shutdown();}}private String[] getRedissonNodes(String hosts) {List<String> nodes = new ArrayList<>();if (hosts == null || hosts.isEmpty()) {return null;}String nodexPrefix = "redis://";String[] arr = StringUtils.split(hosts, ",");for (String host : arr) {nodes.add(nodexPrefix + host);}return nodes.toArray(new String[nodes.size()]);}}




