暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 实时统计历史 pv、uv

原创 jj 2022-06-14
1788

Flink 实时统计 pv、uv 的博客,我已经写了三篇,最近这段时间又做了个尝试,用 sql 来计算全量数据的 pv、uv。

Stream Api 写实时、离线的 pv、uv ,除了要写代码没什么其他的障碍

SQL api 来写就有很多障碍,比如窗口没有 trigger,不能操作 状态,udf 不如 process 算子好用等

问题

预设两个场景的问题:
1. 按天统计 pv、uv
2. 在解决问题 1 的基础上,再解决历史 pv、uv 的统计

实现思路

有以下几种思路,来实现实时统计 pv、uv

  1. 直接使用 CUMULATE WINDOW 计算当日的 pv、uv
  2. 直接使用 CUMULATE WINDOW 计算当日的 pv、uv,再获取昨天的 pv,累加可以得到基于历史的 pv
  3. pv 计算同解法 2 ,uv 的计算采用 udaf,使用 bloom filter 来粗略的计算 uv
  4. pv 计算同解法 2 ,uv 的计算采用 udaf,用 redis 记录 user_id ,每次计算的时候获取 user_id 的数量即 uv
  5. pv 计算同解法 2 ,uv 的计算采用 udaf,每次启动的时候获取历史的 user_id 缓存在内存中,加上新来的 user_id 计算 uv
  6. 全局窗口,直接计算全量的 pv、uv (没意义,未实现)

注: 由于需要实时输出结果,SQL 都选用了 CUMULATE WINDOW

建表语句

建表语句只有 数据流表、输出表、lookup join 输出表


CREATE TABLE user_log
(
     user_id     VARCHAR
    ,item_id     VARCHAR
    ,category_id VARCHAR
    ,behavior    VARCHAR
    ,ts          TIMESTAMP(3)
    ,proc_time   as PROCTIME()
    ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_log'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
);

create table if not exists user_log_lookup_join(
    cal_day varchar
    ,behavior varchar
    ,pv  bigint
    ,uv  bigint
    ,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
    ) with (
          'connector' = 'jdbc'
          ,'url' = 'jdbc:mysql://localhost:3306/venn'
          ,'table-name' = 'pv_uv'
          ,'username' = 'root'
          ,'password' = '123456'
          ,'scan.partition.column' = 'cal_day'
          ,'scan.partition.num' = '1'
          ,'scan.partition.lower-bound' = '0'
          ,'scan.partition.upper-bound' = '9999'
          ,'lookup.cache.max-rows' = '1000'
        -- one day, once cache, the value will not update
          ,'lookup.cache.ttl' = '86400000' -- ttl time 超过这么长时间无数据才行
    );


create table if not exists user_log_sink(
    cal_day varchar
    ,behavior varchar
    ,start_time VARCHAR
    ,end_time VARCHAR
    ,pv  bigint
    ,uv  bigint
    ,last_pv  bigint
    ,last_uv  bigint
    ,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
) with (
--      'connector' = 'print'
      'connector' = 'jdbc'
      ,'url' = 'jdbc:mysql://venn:3306/venn'
      ,'table-name' = 'pv_uv'
      ,'username' = 'root'
      ,'password' = '123456'
);


思路 1

就是个简单的 CUMULATE 的 一天的窗口,统计 count/count distinct ,窗口的触发事件是 10 秒一次

sql 如下:


insert into user_log_sink
select
 date_format(window_start, 'yyyy-MM-dd') cal_day
 ,behavior
 ,date_format(window_start, 'HH:mm:ss') start_time
 , date_format(window_end, 'HH:mm:ss') end_time
 , count(user_id) pv
 , count(distinct user_id) uv
FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECOND, INTERVAL '1' DAY))
  GROUP BY window_start, window_end, behavior
;

结论: 这个只能实时输出当天的 pv、uv,不能计算历史的 pv、uv

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论