暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

拒绝背锅,FlinkCDC采集表操作日志

大数据技能圈 2023-06-22
45

公司的的数据平台平时会采集各个业务系统的数据经过ETL加工处理后提供给各种报表和大屏展示。有个小兄弟就做了其中一部分业务。

前两天小兄弟做的其中的某一项数据出现跟业务系统不一致的情况,经排查我们的程序和日志都没有任何异常,完全是按正确的处理逻辑在运行,那么答案只有一个,数据从业务系统过来的时候就有问题,但是业务操作的同事坚持没有任何误操作,好在之前写了个数据操作操作记录程序将操作记录保存了一下。按时间查询到可能出现问题的数据,结果显示是其在中途手动改了中间的某个值。小兄弟说还好有记录。

今天把这个监控的程序分享给各位同学,有比较重要的数据场景,可以用来做监控。

    # 记录表信息变化
    public class xxxDemo{
    private String id;
    private String field1;
    private String field2;
    private String field3;
    private String field4;
    private String field5;
    private String field6;
    private String field7;


    public String getId() {
    return id;
    }


    public void setId(String id) {
    this.id = id;
    }


    public String getField1() {
    return field1;
    }


    public void setField1(String field1) {
    this.field1 = field1;
    }


    public String getField2() {
    return field2;
    }


    public void setField2(String field2) {
    this.field2 = field2;
    }


    public String getField3() {
    return field3;
    }


    public void setField3(String field3) {
    this.field3 = field3;
    }


    public String getField4() {
    return field4;
    }


    public void setField4(String field4) {
    this.field4 = field4;
    }


    public String getField5() {
    return field5;
    }


    public void setField5(String field5) {
    this.field5 = field5;
    }


    public String getField6() {
    return field6;
    }


    public void setField6(String field6) {
    this.field6 = field6;
    }


    public String getField7() {
    return field7;
    }


    public void setField7(String field7) {
    this.field7 = field7;
    }


    @Override
    public String toString() {
    return "TmpOperateLog{" +
    "id='" + id + '\'' +
    ", field1='" + field1 + '\'' +
    ", field2='" + field2 + '\'' +
    ", field3='" + field3 + '\'' +
    ", field4='" + field4 + '\'' +
    ", field5='" + field5 + '\'' +
    ", field6='" + field6 + '\'' +
    ", field7='" + field7 + '\'' +
    '}';
    }
    }


      # 记录操作变化
      public class MysqlCDCContent {
      private XXXDemo before;
      private XXXDemo after;
      private String db;
      private String table;
      private String op;
      private String ts_ms;


      public XXXDemo getBefore() {
      return before;
      }


      public void setBefore(XXXDemo before) {
      this.before = before;
      }


      public XXXDemo getAfter() {
      return after;
      }


      public void setAfter(XXXDemo after) {
      this.after = after;
      }


      public String getDb() {
      return db;
      }


      public void setDb(String db) {
      this.db = db;
      }


      public String getTable() {
      return table;
      }


      public void setTable(String table) {
      this.table = table;
      }


      public String getOp() {
      return op;
      }


      public void setOp(String op) {
      this.op = op;
      }


      public String getTs_ms() {
      return ts_ms;
      }


      public void setTs_ms(String ts_ms) {
      this.ts_ms = ts_ms;
      }


      @Override
      public String toString() {
      return "MysqlCDCContent{" +
      "before=" + before +
      ", after=" + after +
      ", db='" + db + '\'' +
      ", table='" + table + '\'' +
      ", op='" + op + '\'' +
      ", ts_ms='" + ts_ms + '\'' +
      '}';
      }
      }


        public class MysqlCDC {
        public static void main(String[] args) throws Exception {
        /*
        * decimal数据类型在binlog中会转化成base64
        * 以下配置是用来对采集到binlog中的base64数据转化成String类型
        * */
        Properties properties = new Properties();
        properties.put("decimal.handling.mode", "string");
        Map config = new HashMap();
        config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
        JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(true, config);
                // 从mysql获取数据
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                        .hostname("x.x.x.x")
        .port(3306)
        .databaseList("xxxx") // set captured database
        .tableList("xxx.xxx") // set captured table
        .username("xxxx")
        .password("xxxx")
        .serverTimeZone("Asia/Shanghai")
        .startupOptions(StartupOptions.latest())
        .debeziumProperties(properties)
        .deserializer(jdd) // converts SourceRecord to JSON String
                        .build();


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<String> output = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
        .setParallelism(1).setParallelism(1);
        DataStream<String> output1 = output.map(new MapFunction<String, String>() {
        @Override
        public String map(String s) throws Exception {
        /*
        * 解析binlog数据放到TmpOperateLog对象中
        * */
        JSONObject json = JSON.parseObject(s);
        MysqlCDCContent mysqlCDCContent = new MysqlCDCContent();
                        XXXDemo xXXDemo_before = new XXXDemo ();
                        XXXDemo xXXDemo_after = new XXXDemo ();
        if(null != json.getJSONObject("before")){
        String before_id = json.getJSONObject("before").getString("id");
        String filed1 = json.getJSONObject("before").getString("filed1");
                            String filed2 = json.getJSONObject("before").getString("filed2");
                            String filed3 = json.getJSONObject("before").getString("filed3");
                            String filed4 = json.getJSONObject("before").getString("filed4");
                            String filed5 = json.getJSONObject("before").getString("filed5");
                            String filed6 = json.getJSONObject("before").getString("filed6");
                            String filed7 = json.getJSONObject("before").getString("filed7");


        xXXDemo_before.setId(before_id);
                            xXXDemo_before.setField1(filed1);
                            xXXDemo_before.setField2(filed2);
        xXXDemo_before.setField3(filed3);
        xXXDemo_before.setField4(filed4);
        xXXDemo_before.setField5(filed5);
        xXXDemo_before.setField6(filed6);
                            xXXDemo_before.setField7(filed7);
        }


        if(null != json.getJSONObject("after")){
        String after_id = json.getJSONObject("after").getString("id");
        String filed1 = json.getJSONObject("after").getString("filed1");
        String filed2 = json.getJSONObject("after").getString("filed2");
        String filed3 = json.getJSONObject("after").getString("filed3");
        String filed4 = json.getJSONObject("after").getString("filed4");
        String filed5 = json.getJSONObject("after").getString("filed5");
        String filed6 = json.getJSONObject("after").getString("filed6");
        String filed7 = json.getJSONObject("after").getString("filed7");


        xXXDemo_after.setId(after_id);
        xXXDemo_after.setField1(filed1);
        xXXDemo_after.setField2(filed2);
        xXXDemo_after.setField3(filed3);
        xXXDemo_after.setField4(filed4);
        xXXDemo_after.setField5(filed5);
        xXXDemo_after.setField6(filed6);
        xXXDemo_after.setField7(filed7);
        }


        String db = json.getJSONObject("source").getString("db");
        String table = json.getJSONObject("source").getString("table");
        String op = json.getString("op");
        String ts_ms = json.getString("ts_ms");


        mysqlCDCContent.setBefore(xXXDemo_before);
        mysqlCDCContent.setAfter(xXXDemo_after);
        mysqlCDCContent.setDb(db);
        mysqlCDCContent.setTable(table);
        mysqlCDCContent.setOp(op);
        mysqlCDCContent.setTs_ms(ts_ms);
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(mysqlCDCContent);
        }
        });
                //发送到kafka,这里也可以发送到数据库等各个下游
        KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers("x.x.x.x:9092")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("test-topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
                        )
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        output1.sinkTo(sink);
        env.execute("test");
        }
        }

        更多大数据相关内容请关注大数据技能圈公众号:

        文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论