
flink cdc postgresql sink iceberg with datastream

package com.zjyg.iceberg.flink;import com.ververica.cdc.connectors.postgres.PostgreSQLSource;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.ververica.cdc.debezium.DebeziumSourceFunction;import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.TableColumn;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.data.RowData;import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.inference.TypeTransformations;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.table.types.utils.DataTypeUtils;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.Table;import org.apache.iceberg.Schema;import org.apache.iceberg.PartitionSpec;import org.apache.iceberg.BaseTable;import org.apache.iceberg.TableOperations;import org.apache.iceberg.TableMetadata;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.CatalogLoader;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.sink.FlinkSink;import org.apache.iceberg.types.Types;import java.time.ZoneId;import java.util.Arrays;import java.util.HashMap;import java.util.Map;public class Pg2Iceberg {private static final Schema SCHEMA =new Schema(Types.NestedField.optional(1, "id", Types.IntegerType.get()),Types.NestedField.optional(2, "name", Types.StringType.get()),Types.NestedField.optional(3, "age", Types.IntegerType.get()),Types.NestedField.optional(4, "sex", Types.StringType.get()));public static void main(String[] args) throws Exception {ParameterTool parameterTool = ParameterTool.fromArgs(args);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name"));checkpointConfig.setCheckpointInterval(60 * 1000L);checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L);checkpointConfig.setTolerableCheckpointFailureNumber(10);checkpointConfig.setCheckpointTimeout(120 * 1000L);DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool));icebergSink_hadoop(src, parameterTool);env.execute(parameterTool.get("app_name"));}private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) {Map<String, String> properties = new HashMap<>();properties.put("type", "iceberg");properties.put("catalog-type", "hadoop");properties.put("property-version", "1");properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name"));CatalogLoader catalogLoader =CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties);icebergSink(src, tool, catalogLoader);}private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) {Catalog catalog = loader.loadCatalog();TableIdentifier identifier =TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name"));Table table;if (catalog.tableExists(identifier)) {table = catalog.loadTable(identifier);} else {table =catalog.buildTable(identifier, SCHEMA).withPartitionSpec(PartitionSpec.unpartitioned()).create();}TableOperations operations = ((BaseTable) table).operations();TableMetadata metadata = operations.current();operations.commit(metadata, metadata.upgradeToFormatVersion(2));TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);FlinkSink.forRowData(input).table(table).tableLoader(tableLoader).equalityFieldColumns(Arrays.asList("id")).writeParallelism(1).build();}private static SourceFunction getPgCdc(ParameterTool tool) {TableSchema schema =TableSchema.builder().add(TableColumn.physical("id", DataTypes.INT())).add(TableColumn.physical("name", DataTypes.STRING())).add(TableColumn.physical("age", DataTypes.INT())).add(TableColumn.physical("sex", DataTypes.STRING())).build();RowType rowType = (RowType) schema.toRowDataType().getLogicalType();DebeziumDeserializationSchema deserialer =new RowDataDebeziumDeserializeSchema(rowType,createTypeInfo(schema.toRowDataType()),(rowData, rowKind) -> {},ZoneId.of("Asia/Shanghai"));Properties properties = new Properties();properties.setProperty("decimal.handling.mode", "string"); //decimal等类型转化为stringDebeziumSourceFunction sourceFunction =PostgreSQLSource.<RowData>builder().hostname(tool.get("db_host")).port(Integer.parseInt(tool.get("db_port"))).database(tool.get("db_name")).schemaList(tool.get("schema_name")).tableList(tool.get("schema_name") + "." + tool.get("tb_name")).username(tool.get("db_user")).password(tool.get("db_user_pwd")).decodingPluginName("pgoutput").slotName(parameterTool.get("SLOTNAME4")) .deserializer(deserialer).deserializer(deserialer).debeziumProperties(properties).build();return sourceFunction;}private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) {final DataType internalDataType =DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);return (TypeInformation<RowData>)TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);}}

flink kafka sink iceberg datastream

import java.util.Map;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;import org.apache.flink.core.fs.FileSystem.WriteMode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.data.TimestampData;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.FileFormat;import org.apache.iceberg.PartitionSpec;import org.apache.iceberg.Schema;import org.apache.iceberg.Table;import org.apache.iceberg.TableProperties;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.sink.FlinkSink;import org.apache.iceberg.flink.source.FlinkSource;import org.apache.iceberg.hadoop.HadoopCatalog;import org.apache.iceberg.types.Types;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.coomia.datalake.kafka.KafkaUtils;public class FlinkWriteIcebergTest {public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "root");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(5000L);env.setParallelism(1);// iceberg catalog identification.Configuration conf = new Configuration();Catalog catalog = new HadoopCatalog(conf);// iceberg table identification.TableIdentifier name =TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis());// iceberg table schema identification.Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()),Types.NestedField.required(2, "eventTime", Types.LongType.get()),Types.NestedField.required(3, "eventid", Types.StringType.get()),Types.NestedField.optional(4, "uuid", Types.StringType.get()));Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone());// iceberg table partition identification.// PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build();PartitionSpec spec = PartitionSpec.unpartitioned();// identify using orc format as storage.Map<String, String> props =ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());Table table = null;// create an iceberg table if not exists, otherwise, load it.if (!catalog.tableExists(name))table = catalog.createTable(name, schema, spec, props);elsetable = catalog.loadTable(name);String topic = "arkevent";String servers = "kafka:9092";FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer"));consumer.setStartFromEarliest();SingleOutputStreamOperator<RowData> dataStream =env.addSource(consumer).map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {JSONObject dataJson = JSON.parseObject(value);GenericRowData row = new GenericRowData(5);row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes()));row.setField(1, dataJson.getLong("eventTime"));row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes()));row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes()));row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime")));return row;}});// uid is used for job restart or something when using savepoint.dataStream.uid("flink-consumer");TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());// sink data to iceberg tableFlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1).overwrite(true).build();//read and write to file.DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build();batchData.print();batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " ");// Execute the program.env.execute("Test Iceberg DataStream");}}

spark kafka sink iceberg

package com.zjyg.iceberg.mainimport com.zjyg.iceberg.util.TimeUtilimport org.apache.spark.sql.streaming.Triggerimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.DataTypesimport org.apache.iceberg.spark.SparkCatalogimport org.apache.iceberg.spark.IcebergSparkimport java.sql.Timestampimport java.util.concurrent.TimeUnitimport com.alibaba.fastjson.JSONobject Kafka2IcebergV3 {case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String)import org.apache.spark.sql.functions._def main(args: Array[String]): Unit = {if(args.length < 3) {System.err.println(s"""|Usage: Kafka2Iceberg <brokers> <topics> <Seconds>| <brokers> is a list of one or more Kafka brokers| <topics> is a list of one or more kafka topics to consume from| <groupId> is a kafka consumer gorupId""".stripMargin)System.exit(1)}val brokers = args(0)val topics = args(1)val groupId = args(2)val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark"val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb"try {val spark = SparkSession.builder().config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").config("spark.sql.catalog.iceberg.type","hadoop").config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName).config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg").appName(this.getClass.getSimpleName).getOrCreate()spark.sparkContext.setCheckpointDir(checkpointDir)// IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")System.setProperty("java.security.krb5.conf", "./krb5.conf")import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("startingOffsets", "earliest").option("kafka.security.protocol","SASL_PLAINTEXT").option("kafka.sasl.mechanism","GSSAPI").option("kafka.sasl.kerberos.service.name","kafka").option("kafka.group.id",groupId).option("subscribe",topics).load().withColumn("value", $"value".cast("string")).filter($"value".isNotNull)println("------PrintSchema lines-------")lines.printSchema()val data = lines.map(row => row.getAs[String]("value").toString()).map(s => getLogs(s)).toDF()println("------PrintSchema data-------")data.printSchema()val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3"val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start()query.awaitTermination()spark.close()} catch {case e: Exception => {System.err.println("exit. Exception is:" + e)System.exit(1)}}}def getTags(tags:String):Tags ={var result = Tags("",0,"","","","")try {val tags_columns = tags.split(',')val entrust_prop = (tags_columns(0).split('='))(1).toStringval entrust_status = (tags_columns(1).split('='))(1).toIntval exchange_type = (tags_columns(2).split('='))(1).toStringval op_entrust_way = (tags_columns(3).split('='))(1).toStringval ywlx = (tags_columns(4).split('='))(1).toStringval ywzl = (tags_columns(5).split('='))(1).toStringresult = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)} catch {case e: Exception => {System.err.println("Exception is:" + e)System.exit(1)}}return result}def getLogs(logs:String):Logs = {var v_tags = new Tags("",0,"","","","")var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"")try {val json=JSON.parseObject(logs)val metric = json.getString("metric")val endpoint = json.getString("endpoint")val ts_l = json.getInteger("timestamp") * 1000Lval ts = new Timestamp(ts_l)val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l)val step = json.getInteger("step")val l_value = json.getString("value")val countertype = json.getString("counterType")val j_tags = json.getString("tags")val tags = getTags(j_tags)val l_type = json.getString("type")val kafka = json.getString("kafka")val nodata = json.getString("nodata")result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour)} catch {case e: Exception => {System.err.println("Exception is:" + e)System.exit(1)}}return result}}

flink sink iceberg compation

import org.apache.flink.api.java.utils.ParameterTool;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.Snapshot;import org.apache.iceberg.Table;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.CatalogLoader;import org.apache.iceberg.flink.actions.Actions;import java.util.HashMap;import java.util.Map;public class FlinkCompaction {public static void main(String[] args) throws Exception {ParameterTool tool = ParameterTool.fromArgs(args);Map<String, String> properties = new HashMap<>();properties.put("type", "iceberg");properties.put("catalog-type", "hive");properties.put("property-version", "1");properties.put("warehouse", tool.get("warehouse"));properties.put("uri", tool.get("uri"));if (tool.has("oss.endpoint")) {properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");properties.put("oss.endpoint", tool.get("oss.endpoint"));properties.put("oss.access.key.id", tool.get("oss.access.key.id"));properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));}CatalogLoader loader =CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);Catalog catalog = loader.loadCatalog();TableIdentifier identifier =TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));Table table = catalog.loadTable(identifier);/*** 合并小文件,核心代码*/Actions.forTable(table).rewriteDataFiles().maxParallelism(5).targetSizeInBytes(128 * 1024 * 1024).execute();Snapshot snapshot = table.currentSnapshot();if (snapshot != null) {table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();}}}

spark sink iceberg compaction

import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.Snapshot;import org.apache.iceberg.Table;import org.apache.iceberg.actions.Actions;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.hive.HiveCatalog;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.internal.SQLConf;import java.util.HashMap;import java.util.Map;import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;/*** @author: zhushang* @create: 2021-04-02 14:30*/public class SparkCompaction {public static void main(String[] args) {TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");Map<String, String> config = new HashMap<>();config.put("type", "iceberg");config.put("catalog-type", "hive");config.put("property-version", "1");config.put("warehouse", "warehouse");config.put("uri", "thrift://local:9083");config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");config.put("oss.endpoint", "https://xxx.aliyuncs.com");config.put("oss.access.key.id", "key");config.put("oss.access.key.secret", "secret");sparkSession();HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());hiveCatalog.initialize("iceberg_hive_catalog", config);Table table = hiveCatalog.loadTable(identifier);Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();Snapshot snapshot = table.currentSnapshot();if (snapshot != null) {table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();}}private static void sparkSession() {SparkSession.builder().master("local[*]").config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic").config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083").config("spark.sql.warehouse.dir", "warehouse").config("spark.executor.heartbeatInterval", "100000").config("spark.network.timeoutInterval", "100000").enableHiveSupport().getOrCreate();}}

spark Opentsdb sink iceberg

package com.zjyg.iceberg.sparkimport java.sql.Timestampimport java.util.concurrent.TimeUnitimport com.alibaba.fastjson.JSONimport com.zjyg.iceberg.util.TimeUtilimport org.apache.iceberg.spark.SparkCatalogimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.Triggerobject OpentsdbLog2Iceberg {case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String)def main(args: Array[String]): Unit = {if(args.length < 9) {System.err.println(s"""|Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ...| <1. brokers> is a list of one or more Kafka brokers| <2. topics> is a list of one or more kafka topics to consume from| <3. groupId> is a kafka consumer gorupId| <4. checkpointBaseDir> is a checkpoint dir for job| <5. warehouseBaseDir> is a warehouse base dir for iceberg| <6. catalogName> is a catalogName for iceberg| <7. dbName> is a DatabaseName for iceberg| <8. tbName> is a TableName for iceberg| <9. kafkaServiceName> is a Kafka Service Name""".stripMargin)System.exit(1)}val brokers = args(0)val topics = args(1)val groupId = args(2)val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpointval warehouseBaseDir = args(4) // hdfs:///warehouse/tablespace/externalval catalogName = args(5) // icebergval dbName = args(6) // iceberg_testval tbName = args(7) // opentsdb_logsval kafkaServiceName = args(8) // hdp-kafkaval checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark"val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg"try {val spark = SparkSession.builder().config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").config(s"spark.sql.catalog.${catalogName}.type","hadoop").config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName).config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}").getOrCreate()spark.sparkContext.setCheckpointDir(checkpointDir_spark)System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")System.setProperty("java.security.krb5.conf", "./krb5.conf")import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("startingOffsets", "earliest").option("failOnDataLoss","false").option("kafka.security.protocol","SASL_PLAINTEXT").option("kafka.sasl.mechanism","GSSAPI").option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}").option("kafka.group.id",groupId).option("subscribe",topics).load().withColumn("value", $"value".cast("string")).filter($"value".isNotNull)println("------PrintSchema lines-------")lines.printSchema()val data = lines.map(row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString())).toDF().repartition($"dt", $"hour")println("------PrintSchema data-------")data.printSchema()val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}"val query = data.writeStream.format("iceberg").partitionBy("dt","hour").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("fanout-enabled", "true").option("checkpointLocation", checkpointDir_iceberg).start()query.awaitTermination()spark.close()} catch {case e: Exception => {System.err.println("exit. Exception is:" + e)System.exit(1)}}}def getTags(tags:String):Tags ={var result = Tags("",0,"","","","")try {val tags_columns = tags.split(',')val entrust_prop = (tags_columns(0).split('='))(1).toStringval entrust_status = (tags_columns(1).split('='))(1).toIntval exchange_type = (tags_columns(2).split('='))(1).toStringval op_entrust_way = (tags_columns(3).split('='))(1).toStringval ywlx = (tags_columns(4).split('='))(1).toStringval ywzl = (tags_columns(5).split('='))(1).toStringresult = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)} catch {case e: Exception => {System.err.println("Exception is:" + e)// System.exit(1)}}return result}def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = {var tags = new Tags("",0,"","","","")var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","")try {val json=JSON.parseObject(logs)val metric = json.getString("metric")val endpoint = json.getString("endpoint")val v_ts = json.getInteger("timestamp") * 1000Lval ts = new Timestamp(v_ts)val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts)val step = json.getInteger("step")val v_value = json.getString("value")val countertype = json.getString("counterType")val tags = getTags(json.getString("tags"))val v_type = json.getString("type")val kafka = json.getString("kafka")val nodata = json.getString("nodata")result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"")} catch {case e: Exception => {System.err.println("Exception is:" + e)// System.exit(1)val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp)result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs)}}return result}}
文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




