暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HUDI删除数据的多种方式

OLAP 2021-09-07
4933

1. 删除数据的方式

  • 在要删除的记录中添加 ‘_HOODIE_IS_DELETED’ 且值为true的列

  • 使用分区级别的删除API

  • 使用记录级别删除的API

  • 使用deltastreamer,删除数据

2. 核心配置

    hoodie.datasource.write.operation = "delete_partition"
    spark dataSoruce 如果使用分区级别的删除,需要设置此配置


    hoodie.datasource.write.partitions.to.delete = "partitionValue_1,partitionValue_2,partitionValue_3"
    如果使用此配置,则只需要传递需要删除的分区即可,无需构建dataFrame.
    如果不使用此配置,必须要构建包含主键和分区的dataFrame.


    spark dataSource 如果使用记录级别的删除,需要设置此配置hoodie.datasource.write.operation = delete


    3. 案例

    3.1 分区级别删除

    分区级别删除包含两种方式,一种不依赖DataFrame数据,另外一种是依赖DataFrame数据。

    3.1.1 不依赖DataFrame数据

    不依赖DataFrame数据的删除方式只需要在常规配置下添加如下配置即可

    1. hoodie.datasource.write.operation = delete_partition

    2. hoodie.datasource.write.partitions.to.delete =  具体的分区值

      val df = spark.emptyDataFrame
      df.write.format("org.apache.hudi").
      option("hoodie.insert.shuffle.parallelism", "2").
      option("hoodie.upsert.shuffle.parallelism", "2").
      option("hoodie.bulkinsert.shuffle.parallelism", "2").
      option("hoodie.delete.shuffle.parallelism", "2").
      option("hoodie.table.name", "tableName").
      option("hoodie.datasource.write.partitionpath.field", "partitionpath").
      option("hoodie.datasource.write.operation", "delete_partition").
      option("hoodie.datasource.write.partitions.to.delete", "partitionField_1, partitionField_2").
      mode(Append).
      save(tablePath)

      3.1.2 依赖DataFrame数据

      依赖DataFrame数据的删除方式需要构建一个包含主键和分区的dataFrame,并且使用如下配置

      hoodie.datasource.write.operation = delete_partition

        //需要构建包含分区字段、主键的 dataFrame
        val df = spark.sql("select uuid, partitionpath from hudi_table")
        df.write.format("org.apache.hudi").
        option("hoodie.insert.shuffle.parallelism", "2").
        option("hoodie.upsert.shuffle.parallelism", "2").
        option("hoodie.bulkinsert.shuffle.parallelism", "2").
        option("hoodie.delete.shuffle.parallelism", "2").
        option("hoodie.table.name", "tableName").
        option("hoodie.datasource.write.recordkey.field", "uuid").
        option("hoodie.datasource.write.partitionpath.field", "partitionpath").
        option("hoodie.datasource.write.operation", "delete_partition").
        mode(Append).
        save(tablePath)

        3.2 记录级别删除

        记录级删除也分为两种, 一种是将删除的数据集提前准备进行删除。另外一种是在数据中添加 ‘_HOODIE_IS_DELETED’  且值为true的列

        3.2.1 依赖DataFrame数据

        第一种记录级别删除与第二种分区级别删除配置大致相同. 需要构建一个包含主键和分区的dataFrame, 并且使用如下配置

        hoodie.datasource.write.operation = delete

          val df = spark.sql("select uuid, partitionpath from hudi_table")
          df.write.format("org.apache.hudi").
           option("hoodie.insert.shuffle.parallelism", "2").
           option("hoodie.upsert.shuffle.parallelism", "2").
           option("hoodie.bulkinsert.shuffle.parallelism", "2").
           option("hoodie.delete.shuffle.parallelism", "2").
           option("hoodie.datasource.write.recordkey.field", "uuid").
           option("hoodie.datasource.write.partitionpath.field", "partitionpath").
           option("hoodie.table.name", "tableName").
           option("hoodie.datasource.write.operation", "delete").
          mode(Append).
          save(tablePath)

          3.2.2 赖schema方式

          第二种记录级别删除需要在数据中添加 ‘_HOODIE_IS_DELETED’  且值为true的列

            //需要在dataFram中添加此列,如果此值为false或者不存在则当作常规写入记录,如果此值为false则为删除记录
            StructField(_HOODIE_IS_DELETED, DataTypes.BooleanType, true, Metadata.empty());


            dataFrame.write.format("org.apache.hudi").
            option("hoodie.table.name", "test123").
            option("hoodie.datasource.write.operation", "upsert").
            option("hoodie.datasource.write.recordkey.field", "uuid").
            option("hoodie.datasource.write.partitionpath.field", "partitionpath").
            option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").
            option("hoodie.datasource.write.precombine.field", "ts").
            mode(Append).
            save(basePath)

            3.2.3 deltastreamer方式

            使用deltastreamer方式删除记录和3.2.2 中依赖schama删除方式其实类似

            需要设置deltastreamer 中 schama文件包含 '_hoodie_is_deleted' 并且值为true


            schema:

              {
              "type":"record",
              "name":"schema",
              "fields":[{
              "name": "uuid",
              "type": "String"
              }, {
              "name": "ts",
              "type": "string"
              }, {
              "name": "partitionPath",
              "type": "string"
              }, {
              "name" : "_hoodie_is_deleted",
              "type" : "boolean",
              "default" : false
              }
              ]}

              data:


              {"ts": 0.0, "uuid": "69cdb048", "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}
              文章转载自OLAP,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论