Flink 实时统计 pv、uv 的博客,我已经写了三篇,最近这段时间又做了个尝试,用 sql 来计算全量数据的 pv、uv。
Stream Api 写实时、离线的 pv、uv ,除了要写代码没什么其他的障碍
SQL api 来写就有很多障碍,比如窗口没有 trigger,不能操作 状态,udf 不如 process 算子好用等
问题
预设两个场景的问题:
1. 按天统计 pv、uv
2. 在解决问题 1 的基础上,再解决历史 pv、uv 的统计
实现思路
有以下几种思路,来实现实时统计 pv、uv
- 直接使用 CUMULATE WINDOW 计算当日的 pv、uv
- 直接使用 CUMULATE WINDOW 计算当日的 pv、uv,再获取昨天的 pv,累加可以得到基于历史的 pv
- pv 计算同解法 2 ,uv 的计算采用 udaf,使用 bloom filter 来粗略的计算 uv
- pv 计算同解法 2 ,uv 的计算采用 udaf,用 redis 记录 user_id ,每次计算的时候获取 user_id 的数量即 uv
- pv 计算同解法 2 ,uv 的计算采用 udaf,每次启动的时候获取历史的 user_id 缓存在内存中,加上新来的 user_id 计算 uv
- 全局窗口,直接计算全量的 pv、uv (没意义,未实现)
注: 由于需要实时输出结果,SQL 都选用了 CUMULATE WINDOW
建表语句
建表语句只有 数据流表、输出表、lookup join 输出表
CREATE TABLE user_log
(
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,ts TIMESTAMP(3)
,proc_time as PROCTIME()
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
);
create table if not exists user_log_lookup_join(
cal_day varchar
,behavior varchar
,pv bigint
,uv bigint
,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
) with (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://localhost:3306/venn'
,'table-name' = 'pv_uv'
,'username' = 'root'
,'password' = '123456'
,'scan.partition.column' = 'cal_day'
,'scan.partition.num' = '1'
,'scan.partition.lower-bound' = '0'
,'scan.partition.upper-bound' = '9999'
,'lookup.cache.max-rows' = '1000'
-- one day, once cache, the value will not update
,'lookup.cache.ttl' = '86400000' -- ttl time 超过这么长时间无数据才行
);
create table if not exists user_log_sink(
cal_day varchar
,behavior varchar
,start_time VARCHAR
,end_time VARCHAR
,pv bigint
,uv bigint
,last_pv bigint
,last_uv bigint
,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
) with (
-- 'connector' = 'print'
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://venn:3306/venn'
,'table-name' = 'pv_uv'
,'username' = 'root'
,'password' = '123456'
);
思路 1
就是个简单的 CUMULATE 的 一天的窗口,统计 count/count distinct ,窗口的触发事件是 10 秒一次
sql 如下:
insert into user_log_sink
select
date_format(window_start, 'yyyy-MM-dd') cal_day
,behavior
,date_format(window_start, 'HH:mm:ss') start_time
, date_format(window_end, 'HH:mm:ss') end_time
, count(user_id) pv
, count(distinct user_id) uv
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECOND, INTERVAL '1' DAY))
GROUP BY window_start, window_end, behavior
;
结论: 这个只能实时输出当天的 pv、uv,不能计算历史的 pv、uv
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




