公司的的数据平台平时会采集各个业务系统的数据经过ETL加工处理后提供给各种报表和大屏展示。有个小兄弟就做了其中一部分业务。
前两天小兄弟做的其中的某一项数据出现跟业务系统不一致的情况,经排查我们的程序和日志都没有任何异常,完全是按正确的处理逻辑在运行,那么答案只有一个,数据从业务系统过来的时候就有问题,但是业务操作的同事坚持没有任何误操作,好在之前写了个数据操作操作记录程序将操作记录保存了一下。按时间查询到可能出现问题的数据,结果显示是其在中途手动改了中间的某个值。小兄弟说还好有记录。
今天把这个监控的程序分享给各位同学,有比较重要的数据场景,可以用来做监控。
# 记录表信息变化public class xxxDemo{private String id;private String field1;private String field2;private String field3;private String field4;private String field5;private String field6;private String field7;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getField1() {return field1;}public void setField1(String field1) {this.field1 = field1;}public String getField2() {return field2;}public void setField2(String field2) {this.field2 = field2;}public String getField3() {return field3;}public void setField3(String field3) {this.field3 = field3;}public String getField4() {return field4;}public void setField4(String field4) {this.field4 = field4;}public String getField5() {return field5;}public void setField5(String field5) {this.field5 = field5;}public String getField6() {return field6;}public void setField6(String field6) {this.field6 = field6;}public String getField7() {return field7;}public void setField7(String field7) {this.field7 = field7;}@Overridepublic String toString() {return "TmpOperateLog{" +"id='" + id + '\'' +", field1='" + field1 + '\'' +", field2='" + field2 + '\'' +", field3='" + field3 + '\'' +", field4='" + field4 + '\'' +", field5='" + field5 + '\'' +", field6='" + field6 + '\'' +", field7='" + field7 + '\'' +'}';}}
# 记录操作变化public class MysqlCDCContent {private XXXDemo before;private XXXDemo after;private String db;private String table;private String op;private String ts_ms;public XXXDemo getBefore() {return before;}public void setBefore(XXXDemo before) {this.before = before;}public XXXDemo getAfter() {return after;}public void setAfter(XXXDemo after) {this.after = after;}public String getDb() {return db;}public void setDb(String db) {this.db = db;}public String getTable() {return table;}public void setTable(String table) {this.table = table;}public String getOp() {return op;}public void setOp(String op) {this.op = op;}public String getTs_ms() {return ts_ms;}public void setTs_ms(String ts_ms) {this.ts_ms = ts_ms;}@Overridepublic String toString() {return "MysqlCDCContent{" +"before=" + before +", after=" + after +", db='" + db + '\'' +", table='" + table + '\'' +", op='" + op + '\'' +", ts_ms='" + ts_ms + '\'' +'}';}}
public class MysqlCDC {public static void main(String[] args) throws Exception {/** decimal数据类型在binlog中会转化成base64* 以下配置是用来对采集到binlog中的base64数据转化成String类型* */Properties properties = new Properties();properties.put("decimal.handling.mode", "string");Map config = new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(true, config);// 从mysql获取数据MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("x.x.x.x").port(3306).databaseList("xxxx") // set captured database.tableList("xxx.xxx") // set captured table.username("xxxx").password("xxxx").serverTimeZone("Asia/Shanghai").startupOptions(StartupOptions.latest()).debeziumProperties(properties).deserializer(jdd) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> output = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).setParallelism(1);DataStream<String> output1 = output.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {/** 解析binlog数据放到TmpOperateLog对象中* */JSONObject json = JSON.parseObject(s);MysqlCDCContent mysqlCDCContent = new MysqlCDCContent();XXXDemo xXXDemo_before = new XXXDemo ();XXXDemo xXXDemo_after = new XXXDemo ();if(null != json.getJSONObject("before")){String before_id = json.getJSONObject("before").getString("id");String filed1 = json.getJSONObject("before").getString("filed1");String filed2 = json.getJSONObject("before").getString("filed2");String filed3 = json.getJSONObject("before").getString("filed3");String filed4 = json.getJSONObject("before").getString("filed4");String filed5 = json.getJSONObject("before").getString("filed5");String filed6 = json.getJSONObject("before").getString("filed6");String filed7 = json.getJSONObject("before").getString("filed7");xXXDemo_before.setId(before_id);xXXDemo_before.setField1(filed1);xXXDemo_before.setField2(filed2);xXXDemo_before.setField3(filed3);xXXDemo_before.setField4(filed4);xXXDemo_before.setField5(filed5);xXXDemo_before.setField6(filed6);xXXDemo_before.setField7(filed7);}if(null != json.getJSONObject("after")){String after_id = json.getJSONObject("after").getString("id");String filed1 = json.getJSONObject("after").getString("filed1");String filed2 = json.getJSONObject("after").getString("filed2");String filed3 = json.getJSONObject("after").getString("filed3");String filed4 = json.getJSONObject("after").getString("filed4");String filed5 = json.getJSONObject("after").getString("filed5");String filed6 = json.getJSONObject("after").getString("filed6");String filed7 = json.getJSONObject("after").getString("filed7");xXXDemo_after.setId(after_id);xXXDemo_after.setField1(filed1);xXXDemo_after.setField2(filed2);xXXDemo_after.setField3(filed3);xXXDemo_after.setField4(filed4);xXXDemo_after.setField5(filed5);xXXDemo_after.setField6(filed6);xXXDemo_after.setField7(filed7);}String db = json.getJSONObject("source").getString("db");String table = json.getJSONObject("source").getString("table");String op = json.getString("op");String ts_ms = json.getString("ts_ms");mysqlCDCContent.setBefore(xXXDemo_before);mysqlCDCContent.setAfter(xXXDemo_after);mysqlCDCContent.setDb(db);mysqlCDCContent.setTable(table);mysqlCDCContent.setOp(op);mysqlCDCContent.setTs_ms(ts_ms);ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(mysqlCDCContent);}});//发送到kafka,这里也可以发送到数据库等各个下游KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("x.x.x.x:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("test-topic").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();output1.sinkTo(sink);env.execute("test");}}
更多大数据相关内容请关注大数据技能圈公众号:
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




