暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink 集成iceberg 实践

大数据启示录 2022-03-01
823

flink cdc postgresql sink iceberg with datastream


    package com.zjyg.iceberg.flink;


    import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
    import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import com.ververica.cdc.debezium.DebeziumSourceFunction;
    import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.TableColumn;
    import org.apache.flink.table.api.TableSchema;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.inference.TypeTransformations;
    import org.apache.flink.table.types.logical.RowType;
    import org.apache.flink.table.types.utils.DataTypeUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.iceberg.Table;
    import org.apache.iceberg.Schema;
    import org.apache.iceberg.PartitionSpec;
    import org.apache.iceberg.BaseTable;
    import org.apache.iceberg.TableOperations;
    import org.apache.iceberg.TableMetadata;
    import org.apache.iceberg.catalog.Catalog;
    import org.apache.iceberg.catalog.Namespace;
    import org.apache.iceberg.catalog.TableIdentifier;
    import org.apache.iceberg.flink.CatalogLoader;
    import org.apache.iceberg.flink.TableLoader;
    import org.apache.iceberg.flink.sink.FlinkSink;
    import org.apache.iceberg.types.Types;


    import java.time.ZoneId;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;


    public class Pg2Iceberg {
    private static final Schema SCHEMA =
    new Schema(
    Types.NestedField.optional(1, "id", Types.IntegerType.get()),
    Types.NestedField.optional(2, "name", Types.StringType.get()),
    Types.NestedField.optional(3, "age", Types.IntegerType.get()),
    Types.NestedField.optional(4, "sex", Types.StringType.get())
    );


    public static void main(String[] args) throws Exception {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name"));
    checkpointConfig.setCheckpointInterval(60 * 1000L);
    checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L);
    checkpointConfig.setTolerableCheckpointFailureNumber(10);
    checkpointConfig.setCheckpointTimeout(120 * 1000L);


            DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool));
    icebergSink_hadoop(src, parameterTool);
    env.execute(parameterTool.get("app_name"));
    }


    private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) {
    Map<String, String> properties = new HashMap<>();
    properties.put("type", "iceberg");
    properties.put("catalog-type", "hadoop");
    properties.put("property-version", "1");
    properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name"));


    CatalogLoader catalogLoader =
    CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties);


    icebergSink(src, tool, catalogLoader);
    }


    private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) {
    Catalog catalog = loader.loadCatalog();


    TableIdentifier identifier =
    TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name"));
    Table table;
    if (catalog.tableExists(identifier)) {
    table = catalog.loadTable(identifier);
    } else {
    table =
    catalog.buildTable(identifier, SCHEMA)
    .withPartitionSpec(PartitionSpec.unpartitioned())
    .create();
    }
    TableOperations operations = ((BaseTable) table).operations();
    TableMetadata metadata = operations.current();
    operations.commit(metadata, metadata.upgradeToFormatVersion(2));


    TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);


    FlinkSink.forRowData(input)
    .table(table)
    .tableLoader(tableLoader)
    .equalityFieldColumns(Arrays.asList("id"))
    .writeParallelism(1)
    .build();
    }


    private static SourceFunction getPgCdc(ParameterTool tool) {
    TableSchema schema =
    TableSchema.builder()
    .add(TableColumn.physical("id", DataTypes.INT()))
    .add(TableColumn.physical("name", DataTypes.STRING()))
    .add(TableColumn.physical("age", DataTypes.INT()))
    .add(TableColumn.physical("sex", DataTypes.STRING()))
    .build();


    RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
    DebeziumDeserializationSchema deserialer =
    new RowDataDebeziumDeserializeSchema(
    rowType,
    createTypeInfo(schema.toRowDataType()),
    (rowData, rowKind) -> {},
    ZoneId.of("Asia/Shanghai"));

    Properties properties = new Properties();
            properties.setProperty("decimal.handling.mode""string");   //decimal等类型转化为string
                          
    DebeziumSourceFunction sourceFunction =
    PostgreSQLSource.<RowData>builder()
    .hostname(tool.get("db_host"))
    .port(Integer.parseInt(tool.get("db_port")))
    .database(tool.get("db_name"))
    .schemaList(tool.get("schema_name"))
    .tableList(tool.get("schema_name") + "." + tool.get("tb_name"))
    .username(tool.get("db_user"))
    .password(tool.get("db_user_pwd"))
    .decodingPluginName("pgoutput")
                            .slotName(parameterTool.get("SLOTNAME4")) .deserializer(deserialer)
                            .deserializer(deserialer)
                     .debeziumProperties(properties)
    .build();
    return sourceFunction;
    }


    private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) {
    final DataType internalDataType =
    DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
    return (TypeInformation<RowData>)
    TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
    }
    }



    flink kafka sink iceberg datastream

      import java.util.Map;
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
      import org.apache.flink.core.fs.FileSystem.WriteMode;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
      import org.apache.flink.table.data.GenericRowData;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.data.StringData;
      import org.apache.flink.table.data.TimestampData;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.iceberg.FileFormat;
      import org.apache.iceberg.PartitionSpec;
      import org.apache.iceberg.Schema;
      import org.apache.iceberg.Table;
      import org.apache.iceberg.TableProperties;
      import org.apache.iceberg.catalog.Catalog;
      import org.apache.iceberg.catalog.TableIdentifier;
      import org.apache.iceberg.flink.TableLoader;
      import org.apache.iceberg.flink.sink.FlinkSink;
      import org.apache.iceberg.flink.source.FlinkSource;
      import org.apache.iceberg.hadoop.HadoopCatalog;
      import org.apache.iceberg.types.Types;
      import com.alibaba.fastjson.JSON;
      import com.alibaba.fastjson.JSONObject;
      import com.coomia.datalake.kafka.KafkaUtils;


      public class FlinkWriteIcebergTest {


      public static void main(String[] args) throws Exception {
      System.setProperty("HADOOP_USER_NAME", "root");
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.getConfig().setAutoWatermarkInterval(5000L);
      env.setParallelism(1);


      // iceberg catalog identification.
      Configuration conf = new Configuration();
      Catalog catalog = new HadoopCatalog(conf);


      // iceberg table identification.
      TableIdentifier name =
      TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis());


      // iceberg table schema identification.
      Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()),
      Types.NestedField.required(2, "eventTime", Types.LongType.get()),
      Types.NestedField.required(3, "eventid", Types.StringType.get()),
      Types.NestedField.optional(4, "uuid", Types.StringType.get()));
      Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone());




      // iceberg table partition identification.
      // PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build();


      PartitionSpec spec = PartitionSpec.unpartitioned();
      // identify using orc format as storage.
      Map<String, String> props =
      ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());
      Table table = null;
      // create an iceberg table if not exists, otherwise, load it.
      if (!catalog.tableExists(name))
      table = catalog.createTable(name, schema, spec, props);
      else
      table = catalog.loadTable(name);


      String topic = "arkevent";
      String servers = "kafka:9092";


      FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic,
      new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer"));
      consumer.setStartFromEarliest();


      SingleOutputStreamOperator<RowData> dataStream =
      env.addSource(consumer).map(new MapFunction<String, RowData>() {


      @Override
      public RowData map(String value) throws Exception {
      JSONObject dataJson = JSON.parseObject(value);
      GenericRowData row = new GenericRowData(5);
      row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes()));
      row.setField(1, dataJson.getLong("eventTime"));
      row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes()));
      row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes()));
      row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime")));
      return row;
      }




      });
      // uid is used for job restart or something when using savepoint.
      dataStream.uid("flink-consumer");


      TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());


      // sink data to iceberg table
      FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1)
      .overwrite(true)
      .build();


      //read and write to file.
      DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build();
      batchData.print();
      batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " ");


      // Execute the program.
      env.execute("Test Iceberg DataStream");




      }


      }

       spark kafka sink iceberg 

        package com.zjyg.iceberg.main


        import com.zjyg.iceberg.util.TimeUtil
        import org.apache.spark.sql.streaming.Trigger
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.sql.types.DataTypes


        import org.apache.iceberg.spark.SparkCatalog
        import org.apache.iceberg.spark.IcebergSpark


        import java.sql.Timestamp
        import java.util.concurrent.TimeUnit


        import com.alibaba.fastjson.JSON




        object Kafka2IcebergV3 {
        case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
        case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String)


        import org.apache.spark.sql.functions._


        def main(args: Array[String]): Unit = {
        if(args.length < 3) {
        System.err.println(
        s"""
        |Usage: Kafka2Iceberg <brokers> <topics> <Seconds>
        | <brokers> is a list of one or more Kafka brokers
        | <topics> is a list of one or more kafka topics to consume from
        | <groupId> is a kafka consumer gorupId
        """.stripMargin)
        System.exit(1)
        }


        val brokers = args(0)
        val topics = args(1)
        val groupId = args(2)


        val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark"
        val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb"


        try {
        val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.iceberg.type","hadoop")
        .config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName)
        .config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg")
        .appName(this.getClass.getSimpleName)
        .getOrCreate()
        spark.sparkContext.setCheckpointDir(checkpointDir)


        // IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)




        System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
        System.setProperty("java.security.krb5.conf", "./krb5.conf")


        import spark.implicits._


        val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name","kafka")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)


        println("------PrintSchema lines-------")
        lines.printSchema()


        val data = lines.map(
        row => row.getAs[String]("value").toString()
        ).map(s => getLogs(s)).toDF()


        println("------PrintSchema data-------")
        data.printSchema()


        val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3"
        val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start()
        query.awaitTermination()
        spark.close()


        } catch {
        case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
        }
        }
        }


        def getTags(tags:String):Tags ={
        var result = Tags("",0,"","","","")
        try {
        val tags_columns = tags.split(',')
        val entrust_prop = (tags_columns(0).split('='))(1).toString
        val entrust_status = (tags_columns(1).split('='))(1).toInt
        val exchange_type = (tags_columns(2).split('='))(1).toString
        val op_entrust_way = (tags_columns(3).split('='))(1).toString
        val ywlx = (tags_columns(4).split('='))(1).toString
        val ywzl = (tags_columns(5).split('='))(1).toString
        result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
        } catch {
        case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
        }
        }
        return result
        }


        def getLogs(logs:String):Logs = {
        var v_tags = new Tags("",0,"","","","")
        var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"")
        try {
        val json=JSON.parseObject(logs)


        val metric = json.getString("metric")
        val endpoint = json.getString("endpoint")
        val ts_l = json.getInteger("timestamp") * 1000L
        val ts = new Timestamp(ts_l)
        val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l)
        val step = json.getInteger("step")
        val l_value = json.getString("value")
        val countertype = json.getString("counterType")
        val j_tags = json.getString("tags")
        val tags = getTags(j_tags)
        val l_type = json.getString("type")
        val kafka = json.getString("kafka")
        val nodata = json.getString("nodata")
        result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour)
        } catch {
        case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
        }
        }
        return result
        }


        }

        flink sink iceberg compation

          import org.apache.flink.api.java.utils.ParameterTool;
          import org.apache.hadoop.conf.Configuration;
          import org.apache.iceberg.Snapshot;
          import org.apache.iceberg.Table;
          import org.apache.iceberg.catalog.Catalog;
          import org.apache.iceberg.catalog.Namespace;
          import org.apache.iceberg.catalog.TableIdentifier;
          import org.apache.iceberg.flink.CatalogLoader;
          import org.apache.iceberg.flink.actions.Actions;


          import java.util.HashMap;
          import java.util.Map;


          public class FlinkCompaction {
          public static void main(String[] args) throws Exception {
          ParameterTool tool = ParameterTool.fromArgs(args);
          Map<String, String> properties = new HashMap<>();
          properties.put("type", "iceberg");
          properties.put("catalog-type", "hive");
          properties.put("property-version", "1");
          properties.put("warehouse", tool.get("warehouse"));
          properties.put("uri", tool.get("uri"));
          if (tool.has("oss.endpoint")) {
          properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
          properties.put("oss.endpoint", tool.get("oss.endpoint"));
          properties.put("oss.access.key.id", tool.get("oss.access.key.id"));
          properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));
          }


          CatalogLoader loader =
          CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);
          Catalog catalog = loader.loadCatalog();


          TableIdentifier identifier =
          TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));


          Table table = catalog.loadTable(identifier);


          /**
                  * 合并小文件,核心代码
                  */
          Actions.forTable(table)
          .rewriteDataFiles()
          .maxParallelism(5)
          .targetSizeInBytes(128 * 1024 * 1024)
          .execute();


          Snapshot snapshot = table.currentSnapshot();
          if (snapshot != null) {
          table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
          }
          }
          }


          spark sink iceberg compaction


            import org.apache.hadoop.conf.Configuration;
            import org.apache.iceberg.Snapshot;
            import org.apache.iceberg.Table;
            import org.apache.iceberg.actions.Actions;
            import org.apache.iceberg.catalog.Namespace;
            import org.apache.iceberg.catalog.TableIdentifier;
            import org.apache.iceberg.hive.HiveCatalog;
            import org.apache.spark.sql.SparkSession;
            import org.apache.spark.sql.internal.SQLConf;


            import java.util.HashMap;
            import java.util.Map;


            import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;


            /**
            * @author: zhushang
            * @create: 2021-04-02 14:30
            */
            public class SparkCompaction {
            public static void main(String[] args) {
            TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");


            Map<String, String> config = new HashMap<>();
            config.put("type", "iceberg");
            config.put("catalog-type", "hive");
            config.put("property-version", "1");
            config.put("warehouse", "warehouse");
            config.put("uri", "thrift://local:9083");
            config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
            config.put("oss.endpoint", "https://xxx.aliyuncs.com");
            config.put("oss.access.key.id", "key");
            config.put("oss.access.key.secret", "secret");


            sparkSession();
            HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());
            hiveCatalog.initialize("iceberg_hive_catalog", config);


            Table table = hiveCatalog.loadTable(identifier);


            Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();


            Snapshot snapshot = table.currentSnapshot();
            if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
            }
            }


            private static void sparkSession() {
            SparkSession.builder()
            .master("local[*]")
            .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
            .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083")
            .config("spark.sql.warehouse.dir", "warehouse")
            .config("spark.executor.heartbeatInterval", "100000")
            .config("spark.network.timeoutInterval", "100000")
            .enableHiveSupport()
            .getOrCreate();
            }
            }


            spark Opentsdb sink iceberg

              package com.zjyg.iceberg.spark


              import java.sql.Timestamp
              import java.util.concurrent.TimeUnit


              import com.alibaba.fastjson.JSON
              import com.zjyg.iceberg.util.TimeUtil
              import org.apache.iceberg.spark.SparkCatalog
              import org.apache.spark.sql.SparkSession
              import org.apache.spark.sql.streaming.Trigger


              object OpentsdbLog2Iceberg {
              case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
              case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String)


              def main(args: Array[String]): Unit = {
              if(args.length < 9) {
              System.err.println(
              s"""
              |Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ...
              | <1. brokers> is a list of one or more Kafka brokers
              | <2. topics> is a list of one or more kafka topics to consume from
              | <3. groupId> is a kafka consumer gorupId
              | <4. checkpointBaseDir> is a checkpoint dir for job
              | <5. warehouseBaseDir> is a warehouse base dir for iceberg
              | <6. catalogName> is a catalogName for iceberg
              | <7. dbName> is a DatabaseName for iceberg
              | <8. tbName> is a TableName for iceberg
              | <9. kafkaServiceName> is a Kafka Service Name
              """.stripMargin)
              System.exit(1)
              }


              val brokers = args(0)
              val topics = args(1)
              val groupId = args(2)
              val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpoint
              val warehouseBaseDir = args(4) // hdfs:///warehouse/tablespace/external
              val catalogName = args(5) // iceberg
              val dbName = args(6) // iceberg_test
              val tbName = args(7) // opentsdb_logs
              val kafkaServiceName = args(8) // hdp-kafka


              val checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark"
              val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg"


              try {
              val spark = SparkSession.builder()
              .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
              .config(s"spark.sql.catalog.${catalogName}.type","hadoop")
              .config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName)
              .config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}")
              .getOrCreate()
              spark.sparkContext.setCheckpointDir(checkpointDir_spark)


              System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
              System.setProperty("java.security.krb5.conf", "./krb5.conf")


              import spark.implicits._


              val lines = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", brokers)
              .option("startingOffsets", "earliest")
              .option("failOnDataLoss","false")
              .option("kafka.security.protocol","SASL_PLAINTEXT")
              .option("kafka.sasl.mechanism","GSSAPI")
              .option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}")
              .option("kafka.group.id",groupId)
              .option("subscribe",topics)
              .load()
              .withColumn("value", $"value".cast("string"))
              .filter($"value".isNotNull)


              println("------PrintSchema lines-------")
              lines.printSchema()


              val data = lines.map(
              row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString())
              ).toDF().repartition($"dt", $"hour")


              println("------PrintSchema data-------")
              data.printSchema()


              val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}"
              val query = data.writeStream
              .format("iceberg")
              .partitionBy("dt","hour")
              .outputMode("append")
              .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
              .option("path", tableIdentifier)
              .option("fanout-enabled", "true")
              .option("checkpointLocation", checkpointDir_iceberg)
              .start()
              query.awaitTermination()
              spark.close()


              } catch {
              case e: Exception => {
              System.err.println("exit. Exception is:" + e)
              System.exit(1)
              }
              }
              }


              def getTags(tags:String):Tags ={
              var result = Tags("",0,"","","","")
              try {
              val tags_columns = tags.split(',')
              val entrust_prop = (tags_columns(0).split('='))(1).toString
              val entrust_status = (tags_columns(1).split('='))(1).toInt
              val exchange_type = (tags_columns(2).split('='))(1).toString
              val op_entrust_way = (tags_columns(3).split('='))(1).toString
              val ywlx = (tags_columns(4).split('='))(1).toString
              val ywzl = (tags_columns(5).split('='))(1).toString
              result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
              } catch {
              case e: Exception => {
              System.err.println("Exception is:" + e)
              // System.exit(1)
              }
              }
              return result
              }


              def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = {
              var tags = new Tags("",0,"","","","")
              var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","")
              try {
              val json=JSON.parseObject(logs)
              val metric = json.getString("metric")
              val endpoint = json.getString("endpoint")
              val v_ts = json.getInteger("timestamp") * 1000L
              val ts = new Timestamp(v_ts)
              val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts)
              val step = json.getInteger("step")
              val v_value = json.getString("value")
              val countertype = json.getString("counterType")
              val tags = getTags(json.getString("tags"))
              val v_type = json.getString("type")
              val kafka = json.getString("kafka")
              val nodata = json.getString("nodata")
              result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"")
              } catch {
              case e: Exception => {
              System.err.println("Exception is:" + e)
              // System.exit(1)
              val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp)
              result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs)
              }
              }
              return result
              }
              }



              文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论