暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于准实时规则引擎的业务风控实践

后端搬运工 2022-08-30
789

在当今复杂的互联网环境下,我们的系统时时刻刻都暴露在风险(刷单党、羊毛党)的攻击之中,如果我们不采取有效防御措施,那么这些风险就会对业务造成很大的损失。

用公式可以表达出风控规则和风险数据的系统关系:z=f(x, y),f 为系统风控规则,x 为系统实时输入风险数据,y 为系统的事实数据。

挑战性

  • 数据量大,计算延时严重

  • 风控策略多变

目标

  • 准确及时识别风险

  • 采取有效防御措施

总体架构

基于大数据实时计算可热更新的通用规则引擎,搭建一套业务风控系统。

业务风险:刷单(订单)、薅羊毛(活动)、恶意注册和异常登录(用户)

业务服务

  • 风控系统:识别业务风险,根据业务或埋点数据判断当前用户或事件有无风险;

  • 惩罚系统:对系统风险操作进行控制或惩罚,如禁止下单、增加验证码、限制登录;

  • 分析系统(管理系统):提供系统管理数据展示分析。系统管理如规则管理,分析业务数据如风险的订单,分析系统指标数据如某策略规则拦截率,以及时修正策略规则;


系统引擎

  • 规则引擎:策略规则解析执行,选用 B 站开源的 gengine(golang) 规则引擎;

  • 大数据计算引擎:实时在线或离线计算业务指标数据,选用 Flink + Kafka 流计算引擎,指标数据存储在 Redis(数据异构)。各云厂商提供相应服务,如腾讯云流计算 Oceanus


消息中心:各系统之间通过事件驱动,选用 Kafka

存储:

  • MySQL:风控规则等

  • Redis:指标数据

  • Mongo:操作日志、快照等

系统工作流程

包含 3 个数据流。

  • 实时风控数据流:由红线标识。业务同步调用风控系统,返回风险识别结果,并作相应惩罚,为系统核心链路;

  • 准实时指标数据流:由蓝线标识。大数据计算引擎实时异步写入,准备业务指标数据并存储在 Redis,为系统准实时链路;

  • 准实时/离线分析数据流:由绿线标识。异步写入,生成业务报表和评估风控系统表现的数据,以供进行数据分析;

风控规则抽象

风控规则通常分 2 种,即统计规则和主体属性规则。都可以抽象为通用公式:

  • 统计规则:{某时间段},{某主体} 在 {某个统计维度的结果} {比较操作符} {阈值}

  • 主体属性规则:{主体}.{属性名}

下文将以 1天内同一患者ID订单数超过5笔 规则进行示例和说明。

大数据实时计算引擎

Flink 输入数据为 JSON 格式,Flink 的数据源有 2 种:

  • 业务事件 -> Kafka -> Flink,业务事件需要转化为 JSON 消息格式

  • 业务数据 -> DTS -> Kafka -> Flink,支持全量和增量读取数据

数据指标存储

指标数据异构,用空间换时间。Redis 的 zset 结构,通过 ZCOUNT key startTime endTime
 操作即可统计任意时间段 startTime 至 endTime 内的统计需求。

规则实现写操作读操作

1天内同一

患者ID订单数

超过5笔

key:患者id
value:订单id
score:下单时间
ZADD
O(M*log(N)
ZCOUNT
O(log(N)+M)

形如 1天内同一患者ID订单数超过5笔 规则,数据指标存储格式:

    risk:order-patient-id:123456
    |--111111 1652929153
    |--222222 1652932753
    |--333333 1652939953

    统计指标为:

      // startTime 和 endTime 对应为1天时间间隔
      ZCOUNT risk:order-patient-id:123456 1652940185 1652853785
      3

      随着时间的推移,zset 会出现元素越来越多的情况,后续可以通过定期升级 key 版本号的方式来解决,每次升级版本号之后需要批处理初始化所有指标数据。

      创建指标数据实时计算作业

      选用 Flink 的 SQL 作业类型,见 创建 SQL 作业。
      形如 1天内同一患者ID订单数超过5笔 规则,定义源表和目标表是为了 SQL 中方便使用。

      数据源表

      定义 Kafka 数据源表,字段跟数据表一一对应映射

        CREATE TABLE `risk_input_order` (
        `id` INT,
        `pid` INT,
        `doctor_id` INT,
        `patient_id` INT,
        `deliver_phone` VARCHAR(20),
        `deliver_name` VARCHAR(20),
        `delivery_address` VARCHAR(200),
        `prescription_id` INT,
        `total_price` INT,
        `is_test` TINYINT,
        `created_at` TIMESTAMP
        ) WITH (
        -- 定义 Kafka 参数
        'connector' = 'kafka',
        'topic' = 'med_dts_b_convert', -- 消费的 Topic
        'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset earliest-offset specific-offsets group-offsets timestamp 的任何一种
            'properties.bootstrap.servers' = '172.0.0.1:9092',  -- Kafka 连接地址
        'properties.group.id' = 'risk_input_order', -- 指定 Group ID


        -- 定义数据格式 (JSON 格式)
        'format' = 'json',
        'json.fail-on-missing-field' = 'false', -- 遇到缺失字段不会报错
        'json.ignore-parse-errors' = 'true' -- 忽略任何解析报错
        );

        目标表

        定义 Redis 目标表,对应 ZADD key value score
         操作写入数据。

          CREATE TABLE `risk_output_order_patient_id` (  
          `key` STRING,
          `value` STRING
          `score` DOUBLE
          ) WITH (
          'connector' = 'redis',
          'command' = 'zadd', -- redis zadd命令写入数据
              'nodes' = '172.0.0.1:6379',  -- redis连接地址。
          'password' = 'yourpassword'
          );

          数据计算逻辑

          直接使用 SQL 来清洗合并数据。

            -- 清洗患者id维度订单数据
            INSERT INTO risk_output_order_patient_id
            SELECT
            CONCAT('risk:order-patient-id:', patient_id) AS `key`,
            id AS value,
            UNIX_TIMESTAMP(created_at) AS score
            FROM risk_input_order
            WHERE pid = 0;

            流批一体(流处理和批处理一套逻辑)

            在首次初始化指标数据或者新增数据指标的场景下,需要支持读取全量和增量数据,流批一体后,这样无需维护两套流程。

            源表数据过期时间

            其实要做到流批一体,只需要 Flink 源表历史数据的过期时间不小于指标数据统计周期即可。系统 Kafka 消息过期时间增大为 30 天,则指标数据统计周期最大也为 30 天(已经满足风控规则要求),因此系统是可以做到流批一体的。

            初始化和新增指标数据

            因为在风控场景,规则中的指标数据只需要最近统计周期时间的数据,可以直接重置 Kafka 消息位点来批处理源表历史数据,即可清洗出对应的指标数据。

              CREATE TABLE `risk_input_order` (
              ) WITH (
              -- 定义 Kafka 参数
              'connector' = 'kafka',
              'scan.startup.mode' = 'earliest-offset', -- 最早消费位点
              );

              风控系统

              封装规则引擎形成 risk-service 服务,供业务直接调用。
              风险识别流程:

              接口

              对外提供业务风险识别接口、业务数据或事件上报接口。

                // 订单风控
                service RiskOrder {
                // 风险识别
                rpc CheckRisk (CheckRiskReq) returns (CheckRiskResp);
                // 数据/事件上报
                rpc Report (ReportReq) returns (ReportResp);
                }

                规则引擎

                大量的规则形成规则集,规则集组成一颗决策树,决策树是规则引擎核心的判断逻辑。

                规则体语法

                是一种自定义的 DSL 语法。支持运算符、支持基础数据类型、支持条件语句、支持并发语句块、并支持结构体和方法注入

                  // 规则名必须唯一
                  rule "rulename" "rule-describtion" salience priority
                  begin


                  //规则体


                  end

                  形如 一天内同一患者ID订单数超过5笔 规则,规则体语法为:

                    rule "pa-daily-order-count" "一天内同一患者ID订单数规则" salience 10
                    begin
                    if GetData("order-patient-id", Patient.UserId, 24*3600) > 5 {
                    return // 命中规则
                    }
                    end

                    获取事实和指标数据

                    通过对规则引擎注入预定义结构体方法,可以实现在规则体中获取事实和指标数据。

                    • 事实数据

                    对规则引擎注入 UserPatient 结构体引用(指针),在规则体中通过 {主体}.{属性} 的方式即可获取到事实的某个属性。

                      // 获取事实数据
                      user := xuser.GetUserInfo(userId)
                      patient := xuser.GetPatientInfo(userId)


                      dataContext := context.NewDataContext()
                      // 事实数据注入
                      dataContext.Add("User", user)
                      dataContext.Add("Patient", patient)

                      • 指标数据

                      对规则引擎注入 GetData() 方法,参数是一个三元组,在规则体中通过 GetData(“指标名”, “主体id”, “时间周期”) 的方式即可获得指标数据。

                        // 获取指标数据函数
                        func (s *DataService) GetData(indicator string, subjectId string, period int64) int64 {
                        key := fmt.sprintf("risk:%s:%s", indicator, subjectId)
                        endTime := time.Now().Unix()
                        startTime := endTime - period


                        err, res := redis.SCount(key, startTime, endTime)
                        if err != nil {
                        // 注入函数会忽略 error 返回值,通过错误标志来区分报错和默认值的情况
                        s.SetError(err)
                        return 0
                        }


                        return res
                        }


                        // 函数注入规则引擎
                        dataContext := context.NewDataContext()
                        dataSvc := &DataService{}
                        dataContext.Add("GetData", dataSvc.GetData)

                        执行模式

                        支持并行模式混合模式执行,目前只考虑并行模式。

                        规则编译与执行

                          dataContext := context.NewDataContext()
                          // 数据、函数注入


                          var rules []string
                          // rules from MySQL...


                          // 规则编译
                          ruleBuilder := builder.NewRuleBuilder(dataContext)
                          for _, r := range rules {
                          if err := ruleBuilder.BuildRuleWithIncremental(r); nil != err {
                          return err
                          }
                          }


                          // 规则并行模式执行
                          eng := engine.NewGengine()
                          if err := eng.ExecuteConcurrent(ruleBuilder); nil != err {
                          return err
                          }
                          if dataSvc.HasError() {
                          // 规则执行过程中,获取数据发生的报错
                          return dataSvc.FirstError()
                          }


                          // 执行结果 map[命中规则ID]interface{}
                          resMap, _ := eng.GetRulesResultMap()

                          管理系统

                          管理系统包含惩罚系统分析系统。系统功能如下:

                          怎样发布一个规则

                          一个完整的风控规则发布流程:

                          研发工程师可以在管理后台很方便地编写规则,并支持版本管理:

                          怎样接入一个新的业务风险

                          只需要做 2 件事:

                          • 生成业务指标数据

                          • 配置业务风控规则

                          系统降级

                          因为业务系统会同步调用风控系统进行风险识别,如果风控系统不可用时,则业务系统也不可用,因此需要系统降级措施

                          • 关闭场景开关

                          紧急情况可关闭场景开关,业务风险识别接口则直接返回无风险

                          文章转载自后端搬运工,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论